Skip to content

Commit

Permalink
Add doc_count field mapper (#3985)
Browse files Browse the repository at this point in the history
Bucket aggregations compute bucket doc_count values by incrementing 
the doc_count by 1 for every document collected in the bucket.

When using summary fields (such as aggregate_metric_double) one 
field may represent more than one document. To provide this 
functionality this commit implements a new field mapper (named 
doc_count field mapper). This field is a positive integer representing 
the number of documents aggregated in a single summary field.

Bucket aggregations check if a field of type doc_count exists in a 
document and take this value into consideration when computing doc 
counts.

Note: This originated from upstream PR 64503.

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>
  • Loading branch information
petardz authored Jul 28, 2022
1 parent 85bfde1 commit fb7d81a
Show file tree
Hide file tree
Showing 17 changed files with 692 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
setup:
- do:
indices.create:
index: test_1
body:
settings:
number_of_replicas: 0
mappings:
properties:
str:
type: keyword
number:
type: integer

- do:
bulk:
index: test_1
refresh: true
body:
- '{"index": {}}'
- '{"_doc_count": 10, "str": "abc", "number" : 500, "unmapped": "abc" }'
- '{"index": {}}'
- '{"_doc_count": 5, "str": "xyz", "number" : 100, "unmapped": "xyz" }'
- '{"index": {}}'
- '{"_doc_count": 7, "str": "foo", "number" : 100, "unmapped": "foo" }'
- '{"index": {}}'
- '{"_doc_count": 1, "str": "foo", "number" : 200, "unmapped": "foo" }'
- '{"index": {}}'
- '{"str": "abc", "number" : 500, "unmapped": "abc" }'

---
"Test numeric terms agg with doc_count":
- skip:
version: "1.0.0 - "
reason: "until this is released and we know exact version"

- do:
search:
rest_total_hits_as_int: true
body: { "size" : 0, "aggs" : { "num_terms" : { "terms" : { "field" : "number" } } } }

- match: { hits.total: 5 }
- length: { aggregations.num_terms.buckets: 3 }
- match: { aggregations.num_terms.buckets.0.key: 100 }
- match: { aggregations.num_terms.buckets.0.doc_count: 12 }
- match: { aggregations.num_terms.buckets.1.key: 500 }
- match: { aggregations.num_terms.buckets.1.doc_count: 11 }
- match: { aggregations.num_terms.buckets.2.key: 200 }
- match: { aggregations.num_terms.buckets.2.doc_count: 1 }

---
"Test keyword terms agg with doc_count":
- skip:
version: "1.0.0 - "
reason: "until this is released and we know exact version"
- do:
search:
rest_total_hits_as_int: true
body: { "size" : 0, "aggs" : { "str_terms" : { "terms" : { "field" : "str" } } } }

- match: { hits.total: 5 }
- length: { aggregations.str_terms.buckets: 3 }
- match: { aggregations.str_terms.buckets.0.key: "abc" }
- match: { aggregations.str_terms.buckets.0.doc_count: 11 }
- match: { aggregations.str_terms.buckets.1.key: "foo" }
- match: { aggregations.str_terms.buckets.1.doc_count: 8 }
- match: { aggregations.str_terms.buckets.2.key: "xyz" }
- match: { aggregations.str_terms.buckets.2.doc_count: 5 }

---
"Test unmapped string terms agg with doc_count":
- skip:
version: "1.0.0 - "
reason: "until this is released and we know exact version"
- do:
bulk:
index: test_2
refresh: true
body:
- '{"index": {}}'
- '{"_doc_count": 10, "str": "abc" }'
- '{"index": {}}'
- '{"str": "abc" }'
- do:
search:
index: test_2
rest_total_hits_as_int: true
body: { "size" : 0, "aggs" : { "str_terms" : { "terms" : { "field" : "str.keyword" } } } }

- match: { hits.total: 2 }
- length: { aggregations.str_terms.buckets: 1 }
- match: { aggregations.str_terms.buckets.0.key: "abc" }
- match: { aggregations.str_terms.buckets.0.doc_count: 11 }

---
"Test composite str_terms agg with doc_count":
- skip:
version: "1.0.0 - "
reason: "until this is released and we know exact version"
- do:
search:
rest_total_hits_as_int: true
body: { "size" : 0, "aggs" :
{ "composite_agg" : { "composite" :
{
"sources": ["str_terms": { "terms": { "field": "str" } }]
}
}
}
}

- match: { hits.total: 5 }
- length: { aggregations.composite_agg.buckets: 3 }
- match: { aggregations.composite_agg.buckets.0.key.str_terms: "abc" }
- match: { aggregations.composite_agg.buckets.0.doc_count: 11 }
- match: { aggregations.composite_agg.buckets.1.key.str_terms: "foo" }
- match: { aggregations.composite_agg.buckets.1.doc_count: 8 }
- match: { aggregations.composite_agg.buckets.2.key.str_terms: "xyz" }
- match: { aggregations.composite_agg.buckets.2.doc_count: 5 }

---
"Test composite num_terms agg with doc_count":
- skip:
version: "1.0.0 - "
reason: "until this is released and we know exact version"
- do:
search:
rest_total_hits_as_int: true
body: { "size" : 0, "aggs" :
{ "composite_agg" :
{ "composite" :
{
"sources": ["num_terms" : { "terms" : { "field" : "number" } }]
}
}
}
}

- match: { hits.total: 5 }
- length: { aggregations.composite_agg.buckets: 3 }
- match: { aggregations.composite_agg.buckets.0.key.num_terms: 100 }
- match: { aggregations.composite_agg.buckets.0.doc_count: 12 }
- match: { aggregations.composite_agg.buckets.1.key.num_terms: 200 }
- match: { aggregations.composite_agg.buckets.1.doc_count: 1 }
- match: { aggregations.composite_agg.buckets.2.key.num_terms: 500 }
- match: { aggregations.composite_agg.buckets.2.doc_count: 11 }
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.index.mapper;

import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.Query;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentParserUtils;
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.index.query.QueryShardException;
import org.opensearch.search.lookup.SearchLookup;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

/**
* Mapper for the doc_count field.
*
* @opensearch.internal
*/
public class DocCountFieldMapper extends MetadataFieldMapper {

public static final String NAME = "_doc_count";
public static final String CONTENT_TYPE = "_doc_count";

public static final TypeParser PARSER = new ConfigurableTypeParser(
c -> new DocCountFieldMapper(),
c -> new DocCountFieldMapper.Builder()
);

static class Builder extends MetadataFieldMapper.Builder {

Builder() {
super(NAME);
}

@Override
protected List<Parameter<?>> getParameters() {
return Collections.emptyList();
}

@Override
public DocCountFieldMapper build(BuilderContext context) {
return new DocCountFieldMapper();
}
}

/**
* Field type for DocCount Field Mapper
*
* @opensearch.internal
*/
public static final class DocCountFieldType extends MappedFieldType {

public static final DocCountFieldType INSTANCE = new DocCountFieldType();

private static final Long defaultValue = 1L;

public DocCountFieldType() {
super(NAME, false, false, true, TextSearchInfo.NONE, Collections.emptyMap());
}

@Override
public String typeName() {
return CONTENT_TYPE;
}

@Override
public String familyTypeName() {
return NumberFieldMapper.NumberType.LONG.typeName();
}

@Override
public Query existsQuery(QueryShardContext context) {
return new DocValuesFieldExistsQuery(NAME);
}

@Override
public Query termQuery(Object value, QueryShardContext context) {
throw new QueryShardException(context, "Field [" + name() + "] of type [" + typeName() + "] is not searchable");
}

@Override
public ValueFetcher valueFetcher(QueryShardContext context, SearchLookup searchLookup, String format) {
if (format != null) {
throw new IllegalArgumentException("Field [" + name() + "] of type [" + typeName() + "] doesn't support formats.");
}

return new SourceValueFetcher(name(), context, defaultValue) {
@Override
protected Object parseSourceValue(Object value) {
if ("".equals(value)) {
return defaultValue;
} else {
return NumberFieldMapper.NumberType.objectToLong(value, false);
}
}
};
}
}

private DocCountFieldMapper() {
super(DocCountFieldType.INSTANCE);
}

@Override
protected void parseCreateField(ParseContext context) throws IOException {
XContentParser parser = context.parser();
XContentParserUtils.ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, parser.currentToken(), parser);

long value = parser.longValue(false);
if (value <= 0) {
throw new IllegalArgumentException("Field [" + fieldType().name() + "] must be a positive integer.");
}
final Field docCount = new NumericDocValuesField(NAME, value);
context.doc().add(docCount);
}

@Override
public void preParse(ParseContext context) {}

@Override
public DocCountFieldType fieldType() {
return (DocCountFieldType) super.fieldType();
}

@Override
protected String contentType() {
return CONTENT_TYPE;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.index.mapper.CompletionFieldMapper;
import org.opensearch.index.mapper.DataStreamFieldMapper;
import org.opensearch.index.mapper.DateFieldMapper;
import org.opensearch.index.mapper.DocCountFieldMapper;
import org.opensearch.index.mapper.FieldAliasMapper;
import org.opensearch.index.mapper.FieldNamesFieldMapper;
import org.opensearch.index.mapper.GeoPointFieldMapper;
Expand Down Expand Up @@ -192,6 +193,7 @@ private static Map<String, MetadataFieldMapper.TypeParser> initBuiltInMetadataMa
builtInMetadataMappers.put(NestedPathFieldMapper.NAME, NestedPathFieldMapper.PARSER);
builtInMetadataMappers.put(VersionFieldMapper.NAME, VersionFieldMapper.PARSER);
builtInMetadataMappers.put(SeqNoFieldMapper.NAME, SeqNoFieldMapper.PARSER);
builtInMetadataMappers.put(DocCountFieldMapper.NAME, DocCountFieldMapper.PARSER);
// _field_names must be added last so that it has a chance to see all the other mappers
builtInMetadataMappers.put(FieldNamesFieldMapper.NAME, FieldNamesFieldMapper.PARSER);
return Collections.unmodifiableMap(builtInMetadataMappers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public Map<String, Object> metadata() {

@Override
public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
preGetSubLeafCollectors();
preGetSubLeafCollectors(ctx);
final LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(ctx);
return getLeafCollector(ctx, sub);
}
Expand All @@ -209,7 +209,7 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws
* Can be overridden by aggregator implementations that like the perform an operation before the leaf collectors
* of children aggregators are instantiated for the next segment.
*/
protected void preGetSubLeafCollectors() throws IOException {}
protected void preGetSubLeafCollectors(LeafReaderContext ctx) throws IOException {}

/**
* Can be overridden by aggregator implementation to be called back when the collection phase starts.
Expand Down
Loading

0 comments on commit fb7d81a

Please sign in to comment.