Skip to content

Add "none" gap_policy to pipeline aggs #44516

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/reference/aggregations/pipeline.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,12 @@ _insert_zeros_::
This option will replace missing values with a zero (`0`) and pipeline aggregation computation will
proceed as normal.

_none_::
This option does not apply a gap policy at all. If a pipeline agg can tolerate missing/null data, the
empty bucket is provided to the aggregation for evaluation (such as a `bucket_script`). If the bucket
cannot handle missing/null values (like a derivative, which requires a numerical quantity and cannot
process `null`), the pipeline agg may not evaluate and the metric for that bucket will be missing




Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,83 @@ And the following may be the response:
// TESTRESPONSE[s/"took": 11/"took": $body.took/]
// TESTRESPONSE[s/"_shards": \.\.\./"_shards": $body._shards/]
// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/]

==== Accessing bucket document count

It is often helpful to have the document count in a bucket script. This is especially important
when a bucket is empty and a different value should be generated.

Bucket scripts have access to a special `doc_count` property which contains the document count
for the particular bucket the script is evaluating:

[source,console]
--------------------------------------------------
POST /sales/_search
{
"size": 0,
"aggs" : {
"sales_per_month" : {
"date_histogram" : {
"field" : "date",
"calendar_interval" : "month"
},
"aggs": {
"total_sales": {
"sum": {
"field": "price"
}
},
"doc_count_script": {
"bucket_script": {
"buckets_path": {
"totalSales": "total_sales"
},
"script": "params.totalSales / doc_count" <1>
}
}
}
}
}
}
--------------------------------------------------
// TEST[setup:sales]
<1> `doc_count` is a special parameter which contains the bucket's document count

By default, a `bucket_script` has a `skip` gap policy applied. This means empty buckets (buckets with zero document
count) are not evaluated by the script. But if you wish to evaluate an empty bucket, you can use `gap_policy: none`
and access the doc count in the script to determine how the result should be generated:

[source,console]
--------------------------------------------------
POST /sales/_search
{
"size": 0,
"aggs" : {
"sales_per_month" : {
"date_histogram" : {
"field" : "date",
"calendar_interval" : "month"
},
"aggs": {
"total_sales": {
"sum": {
"field": "price"
}
},
"doc_count_script": {
"bucket_script": {
"buckets_path": {
"totalSales": "total_sales"
},
"gap_policy": "none", <1>
"script": "if (doc_count == 0) {return -1.0} else {return 1.0}" <2>
}
}
}
}
}
}
--------------------------------------------------
// TEST[setup:sales]
<1> The gap policy is set to "none", which means the script will evaluate on all buckets
<2> The script checks the document count and emits a -1.0 if the bucket is empty, or a 1.0 if there are documents
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundE
BucketAggregationSelectorScript.Factory wrappedFactory = parameters -> new BucketAggregationSelectorScript(parameters) {
@Override
public boolean execute() {
return factory.newInstance(getParams()).execute().doubleValue() == 1.0;
return factory.newInstance(getParams()).execute(0).doubleValue() == 1.0;
}
};
return context.factoryClazz.cast(wrappedFactory);
Expand Down Expand Up @@ -144,9 +144,11 @@ private static BucketAggregationScript.Factory newBucketAggregationScriptFactory
functionValuesArray[i] = new ReplaceableConstDoubleValues();
functionValuesMap.put(expr.variables[i], functionValuesArray[i]);
}
//functionValuesMap.put("doc_count", new ReplaceableConstDoubleValues());

return new BucketAggregationScript(parameters) {
@Override
public Double execute() {
public Double execute(long docCount) {
getParams().forEach((name, value) -> {
ReplaceableConstDoubleValues placeholder = functionValuesMap.get(name);
if (placeholder == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ public void testPipelineAggregationScript() throws Exception {
.subAggregation(sum("fourSum").field("four"))
.subAggregation(bucketScript("totalSum",
new Script(ScriptType.INLINE,
ExpressionScriptEngine.NAME, "_value0 + _value1 + _value2", Collections.emptyMap()),
ExpressionScriptEngine.NAME, "_value0 + _value1 + _value2", new HashMap<>()),
"twoSum", "threeSum", "fourSum")))
.execute().actionGet();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
# This file contains a whitelist for the Moving Function pipeline aggregator in core

class org.elasticsearch.search.aggregations.pipeline.MovingFunctions {
double max(double[])
double min(double[])
double sum(double[])
double stdDev(double[], double)
double unweightedAvg(double[])
double linearWeightedAvg(double[])
double ewma(double[], double)
double holt(double[], double, double)
double holtWinters(double[], double, double, double, int, boolean)
double max(Double[])
double min(Double[])
double sum(Double[])
double stdDev(Double[], double)
double unweightedAvg(Double[])
double linearWeightedAvg(Double[])
double ewma(Double[], double)
double holt(Double[], double, double)
double holtWinters(Double[], double, double, double, int, boolean)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
setup:
- do:
indices.create:
index: test
body:
mappings:
properties:
value_field:
type: integer
date:
type: date

- do:
bulk:
refresh: true
body:
- index:
_index: test
_id: 1
- date: "2017-01-01T00:00:00"
value_field: 1
- index:
_index: test
_id: 2
- date: "2017-01-02T00:00:00"
value_field: 2
- index:
_index: test
_id: 3
- date: "2017-01-03T00:00:00"
value_field: 3
- index:
_index: test
_id: 4
- date: "2017-01-04T00:00:00"
value_field: 4
- index:
_index: test
_id: 5
- date: "2017-01-05T00:00:00"
value_field: 5
- index:
_index: test
_id: 6
- date: "2017-01-06T00:00:00"
value_field: 6

- do:
indices.refresh:
index: [test]

---
"Bucket script with gap policy skip":

- do:
bulk:
refresh: true
body:
- index:
_index: test
_id: 7
- date: "2017-01-08T00:00:00"
value_field: 7
- index:
_index: test
_id: 8
- date: "2017-01-10T00:00:00"
value_field: 8


- do:
indices.refresh:
index: [test]

- do:
search:
rest_total_hits_as_int: true
body:
size: 0
aggs:
the_histo:
date_histogram:
field: "date"
calendar_interval: "1d"
aggs:
the_avg:
avg:
field: "value_field"
the_bucket_script:
bucket_script:
buckets_path:
foo: "the_avg.value"
script: "if (doc_count == 0) {return -1.0} else {return 1.0}"
gap_policy: "skip"

- match: { hits.total: 8 }

- length: { aggregations.the_histo.buckets: 10 }

- match: { aggregations.the_histo.buckets.0.key_as_string: "2017-01-01T00:00:00.000Z" }
- match: { aggregations.the_histo.buckets.0.doc_count: 1 }
- match: { aggregations.the_histo.buckets.0.the_bucket_script.value: 1.0 }

- match: { aggregations.the_histo.buckets.1.key_as_string: "2017-01-02T00:00:00.000Z" }
- match: { aggregations.the_histo.buckets.1.doc_count: 1 }
- match: { aggregations.the_histo.buckets.1.the_bucket_script.value: 1.0 }

- match: { aggregations.the_histo.buckets.2.key_as_string: "2017-01-03T00:00:00.000Z" }
- match: { aggregations.the_histo.buckets.2.doc_count: 1 }
- match: { aggregations.the_histo.buckets.2.the_bucket_script.value: 1.0 }

- match: { aggregations.the_histo.buckets.3.key_as_string: "2017-01-04T00:00:00.000Z" }
- match: { aggregations.the_histo.buckets.3.doc_count: 1 }
- match: { aggregations.the_histo.buckets.3.the_bucket_script.value: 1.0 }

- match: { aggregations.the_histo.buckets.4.key_as_string: "2017-01-05T00:00:00.000Z" }
- match: { aggregations.the_histo.buckets.4.doc_count: 1 }
- match: { aggregations.the_histo.buckets.4.the_bucket_script.value: 1.0 }

- match: { aggregations.the_histo.buckets.5.key_as_string: "2017-01-06T00:00:00.000Z" }
- match: { aggregations.the_histo.buckets.5.doc_count: 1 }
- match: { aggregations.the_histo.buckets.5.the_bucket_script.value: 1.0 }

- match: { aggregations.the_histo.buckets.6.key_as_string: "2017-01-07T00:00:00.000Z" }
- match: { aggregations.the_histo.buckets.6.doc_count: 0 }
- is_false: aggregation.the_histo.buckets.6.the_bucket_script.value
- match: { aggregations.the_histo.buckets.6.the_avg.value: null }

- match: { aggregations.the_histo.buckets.7.key_as_string: "2017-01-08T00:00:00.000Z" }
- match: { aggregations.the_histo.buckets.7.doc_count: 1 }
- match: { aggregations.the_histo.buckets.7.the_bucket_script.value: 1.0 }

- match: { aggregations.the_histo.buckets.8.key_as_string: "2017-01-09T00:00:00.000Z" }
- match: { aggregations.the_histo.buckets.8.doc_count: 0 }
- is_false: aggregations.the_histo.buckets.6.the_bucket_script.value
- match: { aggregations.the_histo.buckets.8.the_avg.value: null }

- match: { aggregations.the_histo.buckets.9.key_as_string: "2017-01-10T00:00:00.000Z" }
- match: { aggregations.the_histo.buckets.9.doc_count: 1 }
- match: { aggregations.the_histo.buckets.9.the_bucket_script.value: 1.0 }

---
"Bucket script with gap policy none":

- do:
bulk:
refresh: true
body:
- index:
_index: test
_id: 7
- date: "2017-01-08T00:00:00"
value_field: 7
- index:
_index: test
_id: 8
- date: "2017-01-10T00:00:00"
value_field: 8


- do:
indices.refresh:
index: [test]

- do:
search:
rest_total_hits_as_int: true
body:
size: 0
aggs:
the_histo:
date_histogram:
field: "date"
calendar_interval: "1d"
aggs:
the_avg:
avg:
field: "value_field"
the_bucket_script:
bucket_script:
buckets_path:
foo: "the_avg.value"
script: "if (doc_count == 0) {return -1.0} else {return 1.0}"
gap_policy: "none"

- match: { hits.total: 8 }

- length: { aggregations.the_histo.buckets: 10 }

- match: { aggregations.the_histo.buckets.0.key_as_string: "2017-01-01T00:00:00.000Z" }
- match: { aggregations.the_histo.buckets.0.doc_count: 1 }
- match: { aggregations.the_histo.buckets.0.the_bucket_script.value: 1.0 }

- match: { aggregations.the_histo.buckets.1.key_as_string: "2017-01-02T00:00:00.000Z" }
- match: { aggregations.the_histo.buckets.1.doc_count: 1 }
- match: { aggregations.the_histo.buckets.1.the_bucket_script.value: 1.0 }

- match: { aggregations.the_histo.buckets.2.key_as_string: "2017-01-03T00:00:00.000Z" }
- match: { aggregations.the_histo.buckets.2.doc_count: 1 }
- match: { aggregations.the_histo.buckets.2.the_bucket_script.value: 1.0 }

- match: { aggregations.the_histo.buckets.3.key_as_string: "2017-01-04T00:00:00.000Z" }
- match: { aggregations.the_histo.buckets.3.doc_count: 1 }
- match: { aggregations.the_histo.buckets.3.the_bucket_script.value: 1.0 }

- match: { aggregations.the_histo.buckets.4.key_as_string: "2017-01-05T00:00:00.000Z" }
- match: { aggregations.the_histo.buckets.4.doc_count: 1 }
- match: { aggregations.the_histo.buckets.4.the_bucket_script.value: 1.0 }

- match: { aggregations.the_histo.buckets.5.key_as_string: "2017-01-06T00:00:00.000Z" }
- match: { aggregations.the_histo.buckets.5.doc_count: 1 }
- match: { aggregations.the_histo.buckets.5.the_bucket_script.value: 1.0 }

- match: { aggregations.the_histo.buckets.6.key_as_string: "2017-01-07T00:00:00.000Z" }
- match: { aggregations.the_histo.buckets.6.doc_count: 0 }
- match: { aggregations.the_histo.buckets.6.the_bucket_script.value: -1.0 }
- match: { aggregations.the_histo.buckets.6.the_avg.value: null }

- match: { aggregations.the_histo.buckets.7.key_as_string: "2017-01-08T00:00:00.000Z" }
- match: { aggregations.the_histo.buckets.7.doc_count: 1 }
- match: { aggregations.the_histo.buckets.7.the_bucket_script.value: 1.0 }

- match: { aggregations.the_histo.buckets.8.key_as_string: "2017-01-09T00:00:00.000Z" }
- match: { aggregations.the_histo.buckets.8.doc_count: 0 }
- match: { aggregations.the_histo.buckets.8.the_bucket_script.value: -1.0 }
- match: { aggregations.the_histo.buckets.8.the_avg.value: null }

- match: { aggregations.the_histo.buckets.9.key_as_string: "2017-01-10T00:00:00.000Z" }
- match: { aggregations.the_histo.buckets.9.doc_count: 1 }
- match: { aggregations.the_histo.buckets.9.the_bucket_script.value: 1.0 }
Loading