Skip to content

Fix rate agg with custom _doc_count #79346

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Oct 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/reference/aggregations/metrics/rate-aggregation.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
++++

A `rate` metrics aggregation can be used only inside a `date_histogram` or `composite` aggregation. It calculates a rate of documents
or a field in each bucket. The field values can be generated extracted from specific numeric or
or a field in each bucket. The field values can be extracted from specific numeric or
<<histogram,histogram fields>> in the documents.

NOTE: For `composite` aggregations, there must be exactly one `date_histogram` source for the `rate` aggregation to be supported.
Expand All @@ -27,7 +27,7 @@ A `rate` aggregation looks like this in isolation:
--------------------------------------------------
// NOTCONSOLE

The following request will group all sales records into monthly bucket and than convert the number of sales transaction in each bucket
The following request will group all sales records into monthly buckets and then convert the number of sales transactions in each bucket
into per annual sales rate.

[source,console]
Expand Down Expand Up @@ -56,8 +56,8 @@ GET sales/_search
<1> Histogram is grouped by month.
<2> But the rate is converted into annual rate.

The response will return the annual rate of transaction in each bucket. Since there are 12 months per year, the annual rate will
be automatically calculated by multiplying monthly rate by 12.
The response will return the annual rate of transactions in each bucket. Since there are 12 months per year, the annual rate will
be automatically calculated by multiplying the monthly rate by 12.

[source,console-result]
--------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public abstract class AbstractRateAggregator extends NumericMetricsAggregator.Si
private final Rounding.DateTimeUnit rateUnit;
protected final RateMode rateMode;
private final SizedBucketAggregator sizedBucketAggregator;
protected final boolean computeWithDocCount;

protected DoubleArray sums;
protected DoubleArray compensations;
Expand All @@ -55,6 +56,8 @@ public AbstractRateAggregator(
this.rateUnit = rateUnit;
this.rateMode = rateMode;
this.sizedBucketAggregator = findSizedBucketAncestor();
// If no fields or scripts have been defined in the agg, rate should be computed based on bucket doc_counts
this.computeWithDocCount = valuesSourceConfig.fieldContext() == null && valuesSourceConfig.script() == null;
}

private SizedBucketAggregator findSizedBucketAncestor() {
Expand Down Expand Up @@ -112,5 +115,4 @@ public InternalAggregation buildEmptyAggregation() {
public void doClose() {
Releasables.close(sums, compensations);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.DocCountProvider;
import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValuesSource;
Expand All @@ -22,6 +23,9 @@
import java.util.Map;

public class NumericRateAggregator extends AbstractRateAggregator {

private final DocCountProvider docCountProvider;

public NumericRateAggregator(
String name,
ValuesSourceConfig valuesSourceConfig,
Expand All @@ -32,42 +36,68 @@ public NumericRateAggregator(
Map<String, Object> metadata
) throws IOException {
super(name, valuesSourceConfig, rateUnit, rateMode, context, parent, metadata);
docCountProvider = computeWithDocCount ? new DocCountProvider() : null;
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
final CompensatedSum kahanSummation = new CompensatedSum(0, 0);
final SortedNumericDoubleValues values = ((ValuesSource.Numeric) valuesSource).doubleValues(ctx);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
sums = bigArrays().grow(sums, bucket + 1);
compensations = bigArrays().grow(compensations, bucket + 1);

if (values.advanceExact(doc)) {
final int valuesCount = values.docValueCount();
if (computeWithDocCount) {
// No field or script has been set at the rate agg. So, rate will be computed based on the doc_counts.
// This implementation hard-wires the DocCountProvider and reads the _doc_count fields when available.
// A better approach would be to create a DOC_COUNT ValuesSource type and use that as valuesSource
// In that case the computeRateOnDocs variable and this branch of the if-statement are not required.
docCountProvider.setLeafReaderContext(ctx);
return new LeafBucketCollectorBase(sub, null) {
@Override
public void collect(int doc, long bucket) throws IOException {
sums = bigArrays().grow(sums, bucket + 1);
compensations = bigArrays().grow(compensations, bucket + 1);
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
double sum = sums.get(bucket);
double compensation = compensations.get(bucket);
kahanSummation.reset(sum, compensation);
switch (rateMode) {
case SUM:
for (int i = 0; i < valuesCount; i++) {
kahanSummation.add(values.nextValue());
}
break;
case VALUE_COUNT:
kahanSummation.add(valuesCount);
break;
default:
throw new IllegalArgumentException("Unsupported rate mode " + rateMode);
}

final int docCount = docCountProvider.getDocCount(doc);
kahanSummation.add(docCount);
compensations.set(bucket, kahanSummation.delta());
sums.set(bucket, kahanSummation.value());
}
}
};
};
} else {
final SortedNumericDoubleValues values = ((ValuesSource.Numeric) valuesSource).doubleValues(ctx);
return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long bucket) throws IOException {
sums = bigArrays().grow(sums, bucket + 1);
compensations = bigArrays().grow(compensations, bucket + 1);

if (values.advanceExact(doc)) {
final int valuesCount = values.docValueCount();
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
double sum = sums.get(bucket);
double compensation = compensations.get(bucket);
kahanSummation.reset(sum, compensation);
switch (rateMode) {
case SUM:
for (int i = 0; i < valuesCount; i++) {
kahanSummation.add(values.nextValue());
}
break;
case VALUE_COUNT:
kahanSummation.add(valuesCount);
break;
default:
throw new IllegalArgumentException("Unsupported rate mode " + rateMode);
}

compensations.set(bucket, kahanSummation.delta());
sums.set(bucket, kahanSummation.value());
}
}
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.index.fielddata.ScriptDocValues;
import org.elasticsearch.index.mapper.CustomTermFreqField;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
Expand Down Expand Up @@ -859,6 +860,18 @@ public void testModeWithoutField() {
assertEquals("The mode parameter is only supported with field or script", ex.getMessage());
}

public void testWithCustomDocCount() throws IOException {
testCase(new MatchAllDocsQuery(), "month", true, "month", null, iw -> {
iw.addDocument(doc("2010-03-12T01:07:45", new CustomTermFreqField("_doc_count", "_doc_count", 10)));
iw.addDocument(doc("2010-04-01T03:43:34"));
iw.addDocument(doc("2010-04-27T03:43:34", new CustomTermFreqField("_doc_count", "_doc_count", 5)));
}, dh -> {
assertThat(dh.getBuckets(), hasSize(2));
assertThat(((InternalRate) dh.getBuckets().get(0).getAggregations().asList().get(0)).value(), closeTo(10.0, 0.000001));
assertThat(((InternalRate) dh.getBuckets().get(1).getAggregations().asList().get(0)).value(), closeTo(6.0, 0.000001));
});
}

private static AbstractAggregationBuilder<?> randomValidMultiBucketAggBuilder(
RateAggregationBuilder rateAggregationBuilder,
DateHistogramInterval interval
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,44 @@ setup:
- length: { aggregations.by_date.buckets: 2 }
- match: { aggregations.by_date.buckets.0.rate.value: 1.0 }
- match: { aggregations.by_date.buckets.1.rate.value: 2.0 }


---
"rate with doc_count":
- skip:
version: " - 7.99.99"
reason: bug fixed in 8.0.0
- do:
bulk:
index: test2
refresh: true
body:
- '{"index": {}}'
- '{"timestamp": "2021-09-14T22:33:37.477Z", "_doc_count": 10}'
- '{"index": {}}'
- '{"timestamp": "2021-09-14T22:35:37.477Z", "_doc_count": 5}'
- '{"index": {}}'
- '{"timestamp": "2021-09-14T22:35:38.477Z", "_doc_count": 1}'
- '{"index": {}}'
- '{"timestamp": "2021-09-14T22:36:08.477Z"}'
- do:
search:
size: 0
index: "test2"
body:
aggs:
by_date:
date_histogram:
field: timestamp
fixed_interval: 60s
aggs:
rate:
rate:
unit: minute

- length: { aggregations.by_date.buckets: 4 }
- match: { aggregations.by_date.buckets.0.rate.value: 10.0 }
- match: { aggregations.by_date.buckets.1.rate.value: 0.0 }
- match: { aggregations.by_date.buckets.2.rate.value: 6.0 }
- match: { aggregations.by_date.buckets.3.rate.value: 1.0 }