Skip to content

Commit

Permalink
[Rollup] Use composite's missing_bucket (elastic#31402)
Browse files Browse the repository at this point in the history
We can leverage the composite agg's new `missing_bucket` feature on
terms groupings.  This means the aggregation criteria used in the indexer
will now return null buckets for missing keys.  

Because all buckets are now returned (even if a key is null),
we can guarantee correct doc counts with
"combined" jobs (where a job rolls up multiple schemas).  This was
previously impossible since composite would ignore documents that
didn't have _all_ the keys, meaning non-overlapping schemas would
cause composite to return no buckets.

Note: date_histo does not use `missing_bucket`, since a timestamp is
always required.

The docs have been adjusted to recommend a single, combined job.  It
also makes reference to the previous issue to help users that are upgrading
(rather than just deleting the sections).
  • Loading branch information
polyfractal authored Jul 13, 2018
1 parent bc1284e commit b7f07f0
Show file tree
Hide file tree
Showing 14 changed files with 298 additions and 263 deletions.
237 changes: 12 additions & 225 deletions x-pack/docs/en/rollup/understanding-groups.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,15 @@ if a field is useful for aggregating later, and how you might wish to use it (te

=== Grouping Limitations with heterogeneous indices

There is a known limitation to Rollup groups, due to some internal implementation details at this time. The Rollup feature leverages
the `composite` aggregation from Elasticsearch. At the moment, the composite agg only returns buckets when all keys in the tuple are non-null.
Put another way, if the you request keys `[A,B,C]` in the composite aggregation, the only documents that are aggregated are those that have
_all_ of the keys `A, B` and `C`.
There was previously a limitation in how Rollup could handle indices that had heterogeneous mappings (multiple, unrelated/non-overlapping
mappings). The recommendation at the time was to configure a separate job per data "type". For example, you might configure a separate
job for each Beats module that you had enabled (one for `process`, another for `filesystem`, etc).

Because Rollup uses the composite agg during the indexing process, it inherits this behavior. Practically speaking, if all of the documents
in your index are homogeneous (they have the same mapping), you can ignore this limitation and stop reading now.
This recommendation was driven by internal implementation details that caused document counts to be potentially incorrect if a single "merged"
job was used.

However, if you have a heterogeneous collection of documents that you wish to roll up, you may need to configure two or more jobs to
accurately cover the original data.
This limitation has since been alleviated. As of 6.4.0, it is now considered best practice to combine all rollup configurations
into a single job.

As an example, if your index has two types of documents:

Expand All @@ -157,7 +156,7 @@ and
--------------------------------------------------
// NOTCONSOLE

it may be tempting to create a single, combined rollup job which covers both of these document types, something like this:
the best practice is to combine them into a single rollup job which covers both of these document types, like this:

[source,js]
--------------------------------------------------
Expand Down Expand Up @@ -191,222 +190,10 @@ PUT _xpack/rollup/job/combined
--------------------------------------------------
// NOTCONSOLE

You can see that it includes a `terms` grouping on both "node" and "title", fields that are mutually exclusive in the document types.
*This will not work.* Because the `composite` aggregation (and by extension, Rollup) only returns buckets when all keys are non-null,
and there are no documents that have both a "node" field and a "title" field, this rollup job will not produce any rollups.

Instead, you should configure two independent jobs (sharing the same index, or going to separate indices):

[source,js]
--------------------------------------------------
PUT _xpack/rollup/job/sensor
{
"index_pattern": "data-*",
"rollup_index": "data_rollup",
"cron": "*/30 * * * * ?",
"page_size" :1000,
"groups" : {
"date_histogram": {
"field": "timestamp",
"interval": "1h",
"delay": "7d"
},
"terms": {
"fields": ["node"]
}
},
"metrics": [
{
"field": "temperature",
"metrics": ["min", "max", "sum"]
}
]
}
--------------------------------------------------
// NOTCONSOLE

[source,js]
--------------------------------------------------
PUT _xpack/rollup/job/purchases
{
"index_pattern": "data-*",
"rollup_index": "data_rollup",
"cron": "*/30 * * * * ?",
"page_size" :1000,
"groups" : {
"date_histogram": {
"field": "timestamp",
"interval": "1h",
"delay": "7d"
},
"terms": {
"fields": ["title"]
}
},
"metrics": [
{
"field": "price",
"metrics": ["avg"]
}
]
}
--------------------------------------------------
// NOTCONSOLE

Notice that each job now deals with a single "document type", and will not run into the limitations described above. We are working on changes
in core Elasticsearch to remove this limitation from the `composite` aggregation, and the documentation will be updated accordingly
when this particular scenario is fixed.

=== Doc counts and overlapping jobs

There is an issue with doc counts, related to the above grouping limitation. Imagine you have two Rollup jobs saving to the same index, where
one job is a "subset" of another job.

For example, you might have jobs with these two groupings:

[source,js]
--------------------------------------------------
PUT _xpack/rollup/job/sensor-all
{
"groups" : {
"date_histogram": {
"field": "timestamp",
"interval": "1h",
"delay": "7d"
},
"terms": {
"fields": ["node"]
}
},
"metrics": [
{
"field": "price",
"metrics": ["avg"]
}
]
...
}
--------------------------------------------------
// NOTCONSOLE

and

[source,js]
--------------------------------------------------
PUT _xpack/rollup/job/sensor-building
{
"groups" : {
"date_histogram": {
"field": "timestamp",
"interval": "1h",
"delay": "7d"
},
"terms": {
"fields": ["node", "building"]
}
}
...
}
--------------------------------------------------
// NOTCONSOLE


The first job `sensor-all` contains the groupings and metrics that apply to all data in the index. The second job is rolling up a subset
of data (in different buildings) which also include a building identifier. You did this because combining them would run into the limitation
described in the previous section.

This _mostly_ works, but can sometimes return incorrect `doc_counts` when you search. All metrics will be valid however.

The issue arises from the composite agg limitation described before, combined with search-time optimization. Imagine you try to run the
following aggregation:

[source,js]
--------------------------------------------------
"aggs" : {
"nodes": {
"terms": {
"field": "node"
}
}
}
--------------------------------------------------
// NOTCONSOLE

This aggregation could be serviced by either `sensor-all` or `sensor-building` job, since they both group on the node field. So the RollupSearch
API will search both of them and merge results. This will result in *correct* doc_counts and *correct* metrics. No problem here.

The issue arises from an aggregation that can _only_ be serviced by `sensor-building`, like this one:

[source,js]
--------------------------------------------------
"aggs" : {
"nodes": {
"terms": {
"field": "node"
},
"aggs": {
"building": {
"terms": {
"field": "building"
}
}
}
}
}
--------------------------------------------------
// NOTCONSOLE

Now we run into a problem. The RollupSearch API will correctly identify that only `sensor-building` job has all the required components
to answer the aggregation, and will search it exclusively. Unfortunately, due to the composite aggregation limitation, that job only
rolled up documents that have both a "node" and a "building" field. Meaning that the doc_counts for the `"nodes"` aggregation will not
include counts for any document that doesn't have `[node, building]` fields.

- The `doc_count` for `"nodes"` aggregation will be incorrect because it only contains counts for `nodes` that also have buildings
- The `doc_count` for `"buildings"` aggregation will be correct
- Any metrics, on any level, will be correct

==== Workarounds

There are two main workarounds if you find yourself with a schema like the above.

Easiest and most robust method: use separate indices to store your rollups. The limitations arise because you have several document
schemas co-habitating in a single index, which makes it difficult for rollups to correctly summarize. If you make several rollup
jobs and store them in separate indices, these sorts of difficulties do not arise. It does, however, keep you from searching across several
different rollup indices at the same time.

The other workaround is to include an "off-target" aggregation in the query, which pulls in the "superset" job and corrects the doc counts.
The RollupSearch API determines the best job to search for each "leaf node" in the aggregation tree. So if we include a metric agg on `price`,
which was only defined in the `sensor-all` job, that will "pull in" the other job:

[source,js]
--------------------------------------------------
"aggs" : {
"nodes": {
"terms": {
"field": "node"
},
"aggs": {
"building": {
"terms": {
"field": "building"
}
},
"avg_price": {
"avg": { "field": "price" } <1>
}
}
}
}
--------------------------------------------------
// NOTCONSOLE
<1> Adding an avg aggregation here will fix the doc counts

Because only `sensor-all` job had an `avg` on the price field, the RollupSearch API is forced to pull in that additional job for searching,
and will merge/correct the doc_counts as appropriate. This sort of workaround applies to any additional aggregation -- metric or bucketing --
although it can be tedious to look through the jobs and determine the right one to add.

==== Status
There was previously an issue with document counts on "overlapping" job configurations, driven by the same internal implementation detail.
If there were two Rollup jobs saving to the same index, where one job is a "subset" of another job, it was possible that document counts
could be incorrect for certain aggregation arrangements.

We realize this is an onerous limitation, and somewhat breaks the rollup contract of "pick the fields to rollup, we do the rest". We are
actively working to get the limitation to `composite` agg fixed, and the related issues in Rollup. The documentation will be updated when
the fix is implemented.
This issue has also since been eliminated in 6.4.0.
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ public List<CompositeValuesSourceBuilder<?>> toBuilders() {
vsBuilder.dateHistogramInterval(interval);
vsBuilder.field(field);
vsBuilder.timeZone(timeZone);

return Collections.singletonList(vsBuilder);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public List<CompositeValuesSourceBuilder<?>> toBuilders() {
= new HistogramValuesSourceBuilder(RollupField.formatIndexerAggName(f, HistogramAggregationBuilder.NAME));
vsBuilder.interval(interval);
vsBuilder.field(f);
vsBuilder.missingBucket(true);
return vsBuilder;
}).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public List<CompositeValuesSourceBuilder<?>> toBuilders() {
TermsValuesSourceBuilder vsBuilder
= new TermsValuesSourceBuilder(RollupField.formatIndexerAggName(f, TermsAggregationBuilder.NAME));
vsBuilder.field(f);
vsBuilder.missingBucket(true);
return vsBuilder;
}).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package org.elasticsearch.xpack.core.rollup;

import org.apache.http.HttpStatus;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
Expand All @@ -27,21 +26,13 @@

public class RollupRestTestStateCleaner {

private final Logger logger;
private final RestClient adminClient;

public RollupRestTestStateCleaner(Logger logger, RestClient adminClient) {
this.logger = logger;
this.adminClient = adminClient;
}

public void clearRollupMetadata() throws Exception {
deleteAllJobs();
waitForPendingTasks();
public static void clearRollupMetadata(RestClient adminClient) throws Exception {
deleteAllJobs(adminClient);
waitForPendingTasks(adminClient);
// indices will be deleted by the ESRestTestCase class
}

private void waitForPendingTasks() throws Exception {
private static void waitForPendingTasks(RestClient adminClient) throws Exception {
ESTestCase.assertBusy(() -> {
try {
Response response = adminClient.performRequest("GET", "/_cat/tasks",
Expand Down Expand Up @@ -71,7 +62,7 @@ private void waitForPendingTasks() throws Exception {
}

@SuppressWarnings("unchecked")
private void deleteAllJobs() throws Exception {
private static void deleteAllJobs(RestClient adminClient) throws Exception {
Response response = adminClient.performRequest("GET", "/_xpack/rollup/job/_all");
Map<String, Object> jobs = ESRestTestCase.entityAsMap(response);
@SuppressWarnings("unchecked")
Expand All @@ -83,9 +74,7 @@ private void deleteAllJobs() throws Exception {
}

for (Map<String, Object> jobConfig : jobConfigs) {
logger.debug(jobConfig);
String jobId = (String) ((Map<String, Object>) jobConfig.get("config")).get("id");
logger.debug("Deleting job " + jobId);
try {
response = adminClient.performRequest("DELETE", "/_xpack/rollup/job/" + jobId);
} catch (Exception e) {
Expand All @@ -95,7 +84,8 @@ private void deleteAllJobs() throws Exception {
}

private static String responseEntityToString(Response response) throws Exception {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(),
StandardCharsets.UTF_8))) {
return reader.lines().collect(Collectors.joining("\n"));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ private static InternalAggregation unrollMultiBucket(InternalMultiBucketAggregat
});
} else if (rolled instanceof StringTerms) {
return unrollMultiBucket(rolled, original, currentTree, (bucket, bucketCount, subAggs) -> {

BytesRef key = new BytesRef(bucket.getKeyAsString().getBytes(StandardCharsets.UTF_8));
assert bucketCount >= 0;
//TODO expose getFormatter(), keyed upstream in Core
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,14 @@ static void updateMapping(RollupJob job, ActionListener<PutRollupJobAction.Respo
}

Map<String, Object> rollupMeta = (Map<String, Object>)((Map<String, Object>) m).get(RollupField.ROLLUP_META);

String stringVersion = (String)((Map<String, Object>) m).get(Rollup.ROLLUP_TEMPLATE_VERSION_FIELD);
if (stringVersion == null) {
listener.onFailure(new IllegalStateException("Could not determine version of existing rollup metadata for index ["
+ indexName + "]"));
return;
}

if (rollupMeta.get(job.getConfig().getId()) != null) {
String msg = "Cannot create rollup job [" + job.getConfig().getId()
+ "] because job was previously created (existing metadata).";
Expand Down
Loading

0 comments on commit b7f07f0

Please sign in to comment.