From b7f07f03edb9c5be2665d65eafc8fe9ad95d0fa1 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Fri, 13 Jul 2018 10:07:42 -0400 Subject: [PATCH] [Rollup] Use composite's missing_bucket (#31402) We can leverage the composite agg's new `missing_bucket` feature on terms groupings. This means the aggregation criteria used in the indexer will now return null buckets for missing keys. Because all buckets are now returned (even if a key is null), we can guarantee correct doc counts with "combined" jobs (where a job rolls up multiple schemas). This was previously impossible since composite would ignore documents that didn't have _all_ the keys, meaning non-overlapping schemas would cause composite to return no buckets. Note: date_histo does not use `missing_bucket`, since a timestamp is always required. The docs have been adjusted to recommend a single, combined job. It also makes reference to the previous issue to help users that are upgrading (rather than just deleting the sections). --- .../en/rollup/understanding-groups.asciidoc | 237 +----------------- .../core/rollup/job/DateHistoGroupConfig.java | 1 - .../core/rollup/job/HistoGroupConfig.java | 1 + .../core/rollup/job/TermsGroupConfig.java | 1 + .../rollup/RollupRestTestStateCleaner.java | 24 +- .../rollup/RollupResponseTranslator.java | 1 + .../action/TransportPutRollupJobAction.java | 8 + .../xpack/rollup/job/IndexerUtils.java | 15 +- .../xpack/rollup/job/RollupIndexer.java | 1 + .../RollupResponseTranslationTests.java | 48 ++++ .../action/PutJobStateMachineTests.java | 55 +++- .../xpack/rollup/job/IndexerUtilsTests.java | 106 +++++++- .../xpack/test/rest/XPackRestIT.java | 2 +- .../xpack/restart/FullClusterRestartIT.java | 61 ++++- 14 files changed, 298 insertions(+), 263 deletions(-) diff --git a/x-pack/docs/en/rollup/understanding-groups.asciidoc b/x-pack/docs/en/rollup/understanding-groups.asciidoc index f57f905ae04c8..803555b2d73f7 100644 --- a/x-pack/docs/en/rollup/understanding-groups.asciidoc +++ b/x-pack/docs/en/rollup/understanding-groups.asciidoc @@ -121,16 +121,15 @@ if a field is useful for aggregating later, and how you might wish to use it (te === Grouping Limitations with heterogeneous indices -There is a known limitation to Rollup groups, due to some internal implementation details at this time. The Rollup feature leverages -the `composite` aggregation from Elasticsearch. At the moment, the composite agg only returns buckets when all keys in the tuple are non-null. -Put another way, if the you request keys `[A,B,C]` in the composite aggregation, the only documents that are aggregated are those that have -_all_ of the keys `A, B` and `C`. +There was previously a limitation in how Rollup could handle indices that had heterogeneous mappings (multiple, unrelated/non-overlapping +mappings). The recommendation at the time was to configure a separate job per data "type". For example, you might configure a separate +job for each Beats module that you had enabled (one for `process`, another for `filesystem`, etc). -Because Rollup uses the composite agg during the indexing process, it inherits this behavior. Practically speaking, if all of the documents -in your index are homogeneous (they have the same mapping), you can ignore this limitation and stop reading now. +This recommendation was driven by internal implementation details that caused document counts to be potentially incorrect if a single "merged" +job was used. -However, if you have a heterogeneous collection of documents that you wish to roll up, you may need to configure two or more jobs to -accurately cover the original data. +This limitation has since been alleviated. As of 6.4.0, it is now considered best practice to combine all rollup configurations +into a single job. As an example, if your index has two types of documents: @@ -157,7 +156,7 @@ and -------------------------------------------------- // NOTCONSOLE -it may be tempting to create a single, combined rollup job which covers both of these document types, something like this: +the best practice is to combine them into a single rollup job which covers both of these document types, like this: [source,js] -------------------------------------------------- @@ -191,222 +190,10 @@ PUT _xpack/rollup/job/combined -------------------------------------------------- // NOTCONSOLE -You can see that it includes a `terms` grouping on both "node" and "title", fields that are mutually exclusive in the document types. -*This will not work.* Because the `composite` aggregation (and by extension, Rollup) only returns buckets when all keys are non-null, -and there are no documents that have both a "node" field and a "title" field, this rollup job will not produce any rollups. - -Instead, you should configure two independent jobs (sharing the same index, or going to separate indices): - -[source,js] --------------------------------------------------- -PUT _xpack/rollup/job/sensor -{ - "index_pattern": "data-*", - "rollup_index": "data_rollup", - "cron": "*/30 * * * * ?", - "page_size" :1000, - "groups" : { - "date_histogram": { - "field": "timestamp", - "interval": "1h", - "delay": "7d" - }, - "terms": { - "fields": ["node"] - } - }, - "metrics": [ - { - "field": "temperature", - "metrics": ["min", "max", "sum"] - } - ] -} --------------------------------------------------- -// NOTCONSOLE - -[source,js] --------------------------------------------------- -PUT _xpack/rollup/job/purchases -{ - "index_pattern": "data-*", - "rollup_index": "data_rollup", - "cron": "*/30 * * * * ?", - "page_size" :1000, - "groups" : { - "date_histogram": { - "field": "timestamp", - "interval": "1h", - "delay": "7d" - }, - "terms": { - "fields": ["title"] - } - }, - "metrics": [ - { - "field": "price", - "metrics": ["avg"] - } - ] -} --------------------------------------------------- -// NOTCONSOLE - -Notice that each job now deals with a single "document type", and will not run into the limitations described above. We are working on changes -in core Elasticsearch to remove this limitation from the `composite` aggregation, and the documentation will be updated accordingly -when this particular scenario is fixed. - === Doc counts and overlapping jobs -There is an issue with doc counts, related to the above grouping limitation. Imagine you have two Rollup jobs saving to the same index, where -one job is a "subset" of another job. - -For example, you might have jobs with these two groupings: - -[source,js] --------------------------------------------------- -PUT _xpack/rollup/job/sensor-all -{ - "groups" : { - "date_histogram": { - "field": "timestamp", - "interval": "1h", - "delay": "7d" - }, - "terms": { - "fields": ["node"] - } - }, - "metrics": [ - { - "field": "price", - "metrics": ["avg"] - } - ] - ... -} --------------------------------------------------- -// NOTCONSOLE - -and - -[source,js] --------------------------------------------------- -PUT _xpack/rollup/job/sensor-building -{ - "groups" : { - "date_histogram": { - "field": "timestamp", - "interval": "1h", - "delay": "7d" - }, - "terms": { - "fields": ["node", "building"] - } - } - ... -} --------------------------------------------------- -// NOTCONSOLE - - -The first job `sensor-all` contains the groupings and metrics that apply to all data in the index. The second job is rolling up a subset -of data (in different buildings) which also include a building identifier. You did this because combining them would run into the limitation -described in the previous section. - -This _mostly_ works, but can sometimes return incorrect `doc_counts` when you search. All metrics will be valid however. - -The issue arises from the composite agg limitation described before, combined with search-time optimization. Imagine you try to run the -following aggregation: - -[source,js] --------------------------------------------------- -"aggs" : { - "nodes": { - "terms": { - "field": "node" - } - } -} --------------------------------------------------- -// NOTCONSOLE - -This aggregation could be serviced by either `sensor-all` or `sensor-building` job, since they both group on the node field. So the RollupSearch -API will search both of them and merge results. This will result in *correct* doc_counts and *correct* metrics. No problem here. - -The issue arises from an aggregation that can _only_ be serviced by `sensor-building`, like this one: - -[source,js] --------------------------------------------------- -"aggs" : { - "nodes": { - "terms": { - "field": "node" - }, - "aggs": { - "building": { - "terms": { - "field": "building" - } - } - } - } -} --------------------------------------------------- -// NOTCONSOLE - -Now we run into a problem. The RollupSearch API will correctly identify that only `sensor-building` job has all the required components -to answer the aggregation, and will search it exclusively. Unfortunately, due to the composite aggregation limitation, that job only -rolled up documents that have both a "node" and a "building" field. Meaning that the doc_counts for the `"nodes"` aggregation will not -include counts for any document that doesn't have `[node, building]` fields. - -- The `doc_count` for `"nodes"` aggregation will be incorrect because it only contains counts for `nodes` that also have buildings -- The `doc_count` for `"buildings"` aggregation will be correct -- Any metrics, on any level, will be correct - -==== Workarounds - -There are two main workarounds if you find yourself with a schema like the above. - -Easiest and most robust method: use separate indices to store your rollups. The limitations arise because you have several document -schemas co-habitating in a single index, which makes it difficult for rollups to correctly summarize. If you make several rollup -jobs and store them in separate indices, these sorts of difficulties do not arise. It does, however, keep you from searching across several -different rollup indices at the same time. - -The other workaround is to include an "off-target" aggregation in the query, which pulls in the "superset" job and corrects the doc counts. -The RollupSearch API determines the best job to search for each "leaf node" in the aggregation tree. So if we include a metric agg on `price`, -which was only defined in the `sensor-all` job, that will "pull in" the other job: - -[source,js] --------------------------------------------------- -"aggs" : { - "nodes": { - "terms": { - "field": "node" - }, - "aggs": { - "building": { - "terms": { - "field": "building" - } - }, - "avg_price": { - "avg": { "field": "price" } <1> - } - } - } -} --------------------------------------------------- -// NOTCONSOLE -<1> Adding an avg aggregation here will fix the doc counts - -Because only `sensor-all` job had an `avg` on the price field, the RollupSearch API is forced to pull in that additional job for searching, -and will merge/correct the doc_counts as appropriate. This sort of workaround applies to any additional aggregation -- metric or bucketing -- -although it can be tedious to look through the jobs and determine the right one to add. - -==== Status +There was previously an issue with document counts on "overlapping" job configurations, driven by the same internal implementation detail. +If there were two Rollup jobs saving to the same index, where one job is a "subset" of another job, it was possible that document counts +could be incorrect for certain aggregation arrangements. -We realize this is an onerous limitation, and somewhat breaks the rollup contract of "pick the fields to rollup, we do the rest". We are -actively working to get the limitation to `composite` agg fixed, and the related issues in Rollup. The documentation will be updated when -the fix is implemented. \ No newline at end of file +This issue has also since been eliminated in 6.4.0. \ No newline at end of file diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/DateHistoGroupConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/DateHistoGroupConfig.java index 4b4e4cf7b7c81..4a9fbde61d6be 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/DateHistoGroupConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/DateHistoGroupConfig.java @@ -159,7 +159,6 @@ public List> toBuilders() { vsBuilder.dateHistogramInterval(interval); vsBuilder.field(field); vsBuilder.timeZone(timeZone); - return Collections.singletonList(vsBuilder); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/HistoGroupConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/HistoGroupConfig.java index 8b8d53b4ce9af..2b1511077d955 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/HistoGroupConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/HistoGroupConfig.java @@ -96,6 +96,7 @@ public List> toBuilders() { = new HistogramValuesSourceBuilder(RollupField.formatIndexerAggName(f, HistogramAggregationBuilder.NAME)); vsBuilder.interval(interval); vsBuilder.field(f); + vsBuilder.missingBucket(true); return vsBuilder; }).collect(Collectors.toList()); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/TermsGroupConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/TermsGroupConfig.java index 2f1c35a73edb4..da73020f0087f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/TermsGroupConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/TermsGroupConfig.java @@ -80,6 +80,7 @@ public List> toBuilders() { TermsValuesSourceBuilder vsBuilder = new TermsValuesSourceBuilder(RollupField.formatIndexerAggName(f, TermsAggregationBuilder.NAME)); vsBuilder.field(f); + vsBuilder.missingBucket(true); return vsBuilder; }).collect(Collectors.toList()); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/RollupRestTestStateCleaner.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/RollupRestTestStateCleaner.java index 9938f3a41962b..ae171f138cf46 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/RollupRestTestStateCleaner.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/rollup/RollupRestTestStateCleaner.java @@ -6,7 +6,6 @@ package org.elasticsearch.xpack.core.rollup; import org.apache.http.HttpStatus; -import org.apache.logging.log4j.Logger; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.xcontent.support.XContentMapValues; @@ -27,21 +26,13 @@ public class RollupRestTestStateCleaner { - private final Logger logger; - private final RestClient adminClient; - - public RollupRestTestStateCleaner(Logger logger, RestClient adminClient) { - this.logger = logger; - this.adminClient = adminClient; - } - - public void clearRollupMetadata() throws Exception { - deleteAllJobs(); - waitForPendingTasks(); + public static void clearRollupMetadata(RestClient adminClient) throws Exception { + deleteAllJobs(adminClient); + waitForPendingTasks(adminClient); // indices will be deleted by the ESRestTestCase class } - private void waitForPendingTasks() throws Exception { + private static void waitForPendingTasks(RestClient adminClient) throws Exception { ESTestCase.assertBusy(() -> { try { Response response = adminClient.performRequest("GET", "/_cat/tasks", @@ -71,7 +62,7 @@ private void waitForPendingTasks() throws Exception { } @SuppressWarnings("unchecked") - private void deleteAllJobs() throws Exception { + private static void deleteAllJobs(RestClient adminClient) throws Exception { Response response = adminClient.performRequest("GET", "/_xpack/rollup/job/_all"); Map jobs = ESRestTestCase.entityAsMap(response); @SuppressWarnings("unchecked") @@ -83,9 +74,7 @@ private void deleteAllJobs() throws Exception { } for (Map jobConfig : jobConfigs) { - logger.debug(jobConfig); String jobId = (String) ((Map) jobConfig.get("config")).get("id"); - logger.debug("Deleting job " + jobId); try { response = adminClient.performRequest("DELETE", "/_xpack/rollup/job/" + jobId); } catch (Exception e) { @@ -95,7 +84,8 @@ private void deleteAllJobs() throws Exception { } private static String responseEntityToString(Response response) throws Exception { - try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), + StandardCharsets.UTF_8))) { return reader.lines().collect(Collectors.joining("\n")); } } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java index ba1002896c041..4042e98ef93fb 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/RollupResponseTranslator.java @@ -382,6 +382,7 @@ private static InternalAggregation unrollMultiBucket(InternalMultiBucketAggregat }); } else if (rolled instanceof StringTerms) { return unrollMultiBucket(rolled, original, currentTree, (bucket, bucketCount, subAggs) -> { + BytesRef key = new BytesRef(bucket.getKeyAsString().getBytes(StandardCharsets.UTF_8)); assert bucketCount >= 0; //TODO expose getFormatter(), keyed upstream in Core diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java index 081b97b4ee777..889dfa3ac8efc 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java @@ -173,6 +173,14 @@ static void updateMapping(RollupJob job, ActionListener rollupMeta = (Map)((Map) m).get(RollupField.ROLLUP_META); + + String stringVersion = (String)((Map) m).get(Rollup.ROLLUP_TEMPLATE_VERSION_FIELD); + if (stringVersion == null) { + listener.onFailure(new IllegalStateException("Could not determine version of existing rollup metadata for index [" + + indexName + "]")); + return; + } + if (rollupMeta.get(job.getConfig().getId()) != null) { String msg = "Cannot create rollup job [" + job.getConfig().getId() + "] because job was previously created (existing metadata)."; diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/IndexerUtils.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/IndexerUtils.java index e180e34c4cc26..efac4c2d61b98 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/IndexerUtils.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/IndexerUtils.java @@ -45,7 +45,7 @@ class IndexerUtils { * @param rollupIndex The index that holds rollups for this job * @return A list of rolled documents derived from the response */ - static List processBuckets(CompositeAggregation agg, String rollupIndex, RollupJobStats stats, + static List processBuckets(CompositeAggregation agg, String rollupIndex, RollupJobStats stats, GroupConfig groupConfig, String jobId) { logger.debug("Buckets: [" + agg.getBuckets().size() + "][" + jobId + "]"); @@ -80,6 +80,7 @@ private static CRC32 processKeys(Map keys, Map d doc.put(k + "." + RollupField.COUNT_FIELD, count); if (k.endsWith("." + DateHistogramAggregationBuilder.NAME)) { + assert v != null; doc.put(k + "." + RollupField.TIMESTAMP, v); doc.put(k + "." + RollupField.INTERVAL, groupConfig.getDateHisto().getInterval()); doc.put(k + "." + DateHistoGroupConfig.TIME_ZONE, groupConfig.getDateHisto().getTimeZone().toString()); @@ -87,10 +88,18 @@ private static CRC32 processKeys(Map keys, Map d } else if (k.endsWith("." + HistogramAggregationBuilder.NAME)) { doc.put(k + "." + RollupField.VALUE, v); doc.put(k + "." + RollupField.INTERVAL, groupConfig.getHisto().getInterval()); - docID.update(Numbers.doubleToBytes((Double)v), 0, 8); + if (v == null) { + // Arbitrary value to update the doc ID with for nulls + docID.update(19); + } else { + docID.update(Numbers.doubleToBytes((Double) v), 0, 8); + } } else if (k.endsWith("." + TermsAggregationBuilder.NAME)) { doc.put(k + "." + RollupField.VALUE, v); - if (v instanceof String) { + if (v == null) { + // Arbitrary value to update the doc ID with for nulls + docID.update(19); + } else if (v instanceof String) { byte[] vs = ((String) v).getBytes(StandardCharsets.UTF_8); docID.update(vs, 0, vs.length); } else if (v instanceof Long) { diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java index a07f1e7d32e7c..1711c0e34eb1e 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupIndexer.java @@ -401,6 +401,7 @@ private CompositeAggregationBuilder createCompositeBuilder(RollupJobConfig confi composite.setMetaData(metadata); } composite.size(config.getPageSize()); + return composite; } diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupResponseTranslationTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupResponseTranslationTests.java index 7b03d8e8d038d..98e3ad8197a51 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupResponseTranslationTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupResponseTranslationTests.java @@ -1082,6 +1082,54 @@ public void testStringTerms() throws IOException { assertThat(unrolled.toString(), not(equalTo(responses.get(1).toString()))); } + public void testStringTermsNullValue() throws IOException { + TermsAggregationBuilder nonRollupTerms = new TermsAggregationBuilder("terms", ValueType.STRING) + .field("stringField"); + + TermsAggregationBuilder rollupTerms = new TermsAggregationBuilder("terms", ValueType.STRING) + .field("stringfield.terms." + RollupField.VALUE) + .subAggregation(new SumAggregationBuilder("terms." + COUNT_FIELD) + .field("stringfield.terms." + RollupField.COUNT_FIELD)); + + KeywordFieldMapper.Builder nrBuilder = new KeywordFieldMapper.Builder("terms"); + KeywordFieldMapper.KeywordFieldType nrFTterm = nrBuilder.fieldType(); + nrFTterm.setHasDocValues(true); + nrFTterm.setName(nonRollupTerms.field()); + + KeywordFieldMapper.Builder rBuilder = new KeywordFieldMapper.Builder("terms"); + KeywordFieldMapper.KeywordFieldType rFTterm = rBuilder.fieldType(); + rFTterm.setHasDocValues(true); + rFTterm.setName(rollupTerms.field()); + + NumberFieldMapper.Builder valueBuilder = new NumberFieldMapper.Builder("terms." + RollupField.COUNT_FIELD, + NumberFieldMapper.NumberType.LONG); + MappedFieldType rFTvalue = valueBuilder.fieldType(); + rFTvalue.setHasDocValues(true); + rFTvalue.setName("stringfield.terms." + RollupField.COUNT_FIELD); + + List responses = doQueries(new MatchAllDocsQuery(), + iw -> { + iw.addDocument(stringValueDoc("abc")); + iw.addDocument(stringValueDoc("abc")); + iw.addDocument(stringValueDoc("abc")); + + // off target + Document doc = new Document(); + doc.add(new SortedSetDocValuesField("otherField", new BytesRef("other"))); + iw.addDocument(doc); + }, nonRollupTerms, + iw -> { + iw.addDocument(stringValueRollupDoc("abc", 3)); + }, rollupTerms, + new MappedFieldType[]{nrFTterm}, new MappedFieldType[]{rFTterm, rFTvalue}); + + InternalAggregation unrolled = RollupResponseTranslator.unrollAgg(responses.get(1), null, null, 0); + + // The null_value placeholder should be removed from the response and not visible here + assertThat(unrolled.toString(), equalTo(responses.get(0).toString())); + assertThat(unrolled.toString(), not(equalTo(responses.get(1).toString()))); + } + public void testLongTerms() throws IOException { TermsAggregationBuilder nonRollupTerms = new TermsAggregationBuilder("terms", ValueType.LONG) .field("longField"); diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java index 64cf9d2e3fe21..58fa9d4533bc3 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java @@ -28,9 +28,12 @@ import org.elasticsearch.xpack.core.rollup.job.RollupJob; import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig; import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; +import org.elasticsearch.xpack.rollup.Rollup; import org.mockito.ArgumentCaptor; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -203,6 +206,43 @@ public void testNoMetadataInMapping() { verify(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), any()); } + @SuppressWarnings("unchecked") + public void testNoMappingVersion() { + RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob("foo").build(), Collections.emptyMap()); + + ActionListener testListener = ActionListener.wrap(response -> { + fail("Listener success should not have been triggered."); + }, e -> { + assertThat(e.getMessage(), equalTo("Could not determine version of existing rollup metadata for index [" + + job.getConfig().getRollupIndex() + "]")); + }); + + Logger logger = mock(Logger.class); + Client client = mock(Client.class); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(ActionListener.class); + doAnswer(invocation -> { + GetMappingsResponse response = mock(GetMappingsResponse.class); + Map m = new HashMap<>(2); + m.put(RollupField.ROLLUP_META, + Collections.singletonMap(job.getConfig().getId(), job.getConfig())); + MappingMetaData meta = new MappingMetaData(RollupField.TYPE_NAME, + Collections.singletonMap("_meta", m)); + ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(1); + builder.put(RollupField.TYPE_NAME, meta); + + ImmutableOpenMap.Builder> builder2 = ImmutableOpenMap.builder(1); + builder2.put(job.getConfig().getRollupIndex(), builder.build()); + + when(response.getMappings()).thenReturn(builder2.build()); + requestCaptor.getValue().onResponse(response); + return null; + }).when(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), requestCaptor.capture()); + + TransportPutRollupJobAction.updateMapping(job, testListener, mock(PersistentTasksService.class), client, logger); + verify(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), any()); + } + @SuppressWarnings("unchecked") public void testJobAlreadyInMapping() { RollupJob job = new RollupJob(ConfigTestHelpers.getRollupJob("foo").build(), Collections.emptyMap()); @@ -219,10 +259,12 @@ public void testJobAlreadyInMapping() { ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(ActionListener.class); doAnswer(invocation -> { GetMappingsResponse response = mock(GetMappingsResponse.class); + Map m = new HashMap<>(2); + m.put(Rollup.ROLLUP_TEMPLATE_VERSION_FIELD, Version.V_6_4_0); + m.put(RollupField.ROLLUP_META, + Collections.singletonMap(job.getConfig().getId(), job.getConfig())); MappingMetaData meta = new MappingMetaData(RollupField.TYPE_NAME, - Collections.singletonMap("_meta", - Collections.singletonMap(RollupField.ROLLUP_META, - Collections.singletonMap(job.getConfig().getId(), job.getConfig())))); + Collections.singletonMap("_meta", m)); ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(1); builder.put(RollupField.TYPE_NAME, meta); @@ -258,9 +300,12 @@ public void testAddJobToMapping() { ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(ActionListener.class); doAnswer(invocation -> { GetMappingsResponse response = mock(GetMappingsResponse.class); + Map m = new HashMap<>(2); + m.put(Rollup.ROLLUP_TEMPLATE_VERSION_FIELD, Version.V_6_4_0); + m.put(RollupField.ROLLUP_META, + Collections.singletonMap(unrelatedJob.getId(), unrelatedJob)); MappingMetaData meta = new MappingMetaData(RollupField.TYPE_NAME, - Collections.singletonMap("_meta", Collections.singletonMap(RollupField.ROLLUP_META, - Collections.singletonMap(unrelatedJob.getId(), unrelatedJob)))); + Collections.singletonMap("_meta", m)); ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(1); builder.put(RollupField.TYPE_NAME, meta); diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java index 53421faa9bc38..07ad0af7f1c38 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/IndexerUtilsTests.java @@ -15,6 +15,7 @@ import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.store.Directory; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.common.Strings; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NumberFieldMapper; @@ -33,12 +34,13 @@ import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder; +import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; import org.elasticsearch.xpack.core.rollup.RollupField; import org.elasticsearch.xpack.core.rollup.job.DateHistoGroupConfig; import org.elasticsearch.xpack.core.rollup.job.GroupConfig; import org.elasticsearch.xpack.core.rollup.job.MetricConfig; import org.elasticsearch.xpack.core.rollup.job.RollupJobStats; -import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers; +import org.elasticsearch.xpack.core.rollup.job.TermsGroupConfig; import org.joda.time.DateTime; import org.mockito.stubbing.Answer; @@ -50,8 +52,8 @@ import java.util.List; import java.util.Map; -import static org.mockito.Mockito.mock; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class IndexerUtilsTests extends AggregatorTestCase { @@ -359,6 +361,106 @@ public void testKeyOrdering() { assertThat(docs.get(0).id(), equalTo("1237859798")); } + public void testNullKeys() { + CompositeAggregation composite = mock(CompositeAggregation.class); + + when(composite.getBuckets()).thenAnswer((Answer>) invocationOnMock -> { + List foos = new ArrayList<>(); + + CompositeAggregation.Bucket bucket = mock(CompositeAggregation.Bucket.class); + LinkedHashMap keys = new LinkedHashMap<>(3); + keys.put("bar.terms", null); + keys.put("abc.histogram", null); + when(bucket.getKey()).thenReturn(keys); + + Aggregations aggs = new Aggregations(Collections.emptyList()); + when(bucket.getAggregations()).thenReturn(aggs); + when(bucket.getDocCount()).thenReturn(1L); + + foos.add(bucket); + + return foos; + }); + + GroupConfig.Builder groupConfig = ConfigTestHelpers.getGroupConfig(); + groupConfig.setHisto(ConfigTestHelpers.getHisto().setFields(Collections.singletonList("abc")).build()); + + List docs = IndexerUtils.processBuckets(composite, "foo", new RollupJobStats(), groupConfig.build(), "foo"); + assertThat(docs.size(), equalTo(1)); + assertFalse(Strings.isNullOrEmpty(docs.get(0).id())); + } + + public void testMissingBuckets() throws IOException { + String indexName = randomAlphaOfLengthBetween(1, 10); + RollupJobStats stats= new RollupJobStats(0, 0, 0, 0); + + String metricField = "metric_field"; + String valueField = "value_field"; + + Directory directory = newDirectory(); + RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory); + + int numDocs = 10; + + for (int i = 0; i < numDocs; i++) { + Document document = new Document(); + + // Every other doc omit the valueField, so that we get some null buckets + if (i % 2 == 0) { + document.add(new SortedNumericDocValuesField(valueField, i)); + document.add(new LongPoint(valueField, i)); + } + document.add(new SortedNumericDocValuesField(metricField, i)); + document.add(new LongPoint(metricField, i)); + indexWriter.addDocument(document); + } + + indexWriter.close(); + + IndexReader indexReader = DirectoryReader.open(directory); + IndexSearcher indexSearcher = newIndexSearcher(indexReader); + + MappedFieldType valueFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG); + valueFieldType.setName(valueField); + valueFieldType.setHasDocValues(true); + valueFieldType.setName(valueField); + + MappedFieldType metricFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG); + metricFieldType.setName(metricField); + metricFieldType.setHasDocValues(true); + metricFieldType.setName(metricField); + + // Setup the composite agg + TermsGroupConfig termsGroupConfig = new TermsGroupConfig.Builder().setFields(Collections.singletonList(valueField)).build(); + CompositeAggregationBuilder compositeBuilder = new CompositeAggregationBuilder(RollupIndexer.AGGREGATION_NAME, + termsGroupConfig.toBuilders()).size(numDocs*2); + + MetricConfig metricConfig = new MetricConfig.Builder().setField(metricField).setMetrics(Collections.singletonList("max")).build(); + metricConfig.toBuilders().forEach(compositeBuilder::subAggregation); + + Aggregator aggregator = createAggregator(compositeBuilder, indexSearcher, valueFieldType, metricFieldType); + aggregator.preCollection(); + indexSearcher.search(new MatchAllDocsQuery(), aggregator); + aggregator.postCollection(); + CompositeAggregation composite = (CompositeAggregation) aggregator.buildAggregation(0L); + indexReader.close(); + directory.close(); + + List docs = IndexerUtils.processBuckets(composite, indexName, stats, + ConfigTestHelpers.getGroupConfig().build(), "foo"); + + assertThat(docs.size(), equalTo(6)); + for (IndexRequest doc : docs) { + Map map = doc.sourceAsMap(); + Object value = map.get(valueField + "." + TermsAggregationBuilder.NAME + "." + RollupField.VALUE); + if (value == null) { + assertThat(map.get(valueField + "." + TermsAggregationBuilder.NAME + "." + RollupField.COUNT_FIELD), equalTo(5)); + } else { + assertThat(map.get(valueField + "." + TermsAggregationBuilder.NAME + "." + RollupField.COUNT_FIELD), equalTo(1)); + } + } + } + interface Mock { List getBuckets(); } diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java index 412c75f0e639c..f1d9eb1fb3f24 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java @@ -263,7 +263,7 @@ private void clearMlState() throws Exception { */ private void clearRollupState() throws Exception { if (isRollupTest()) { - new RollupRestTestStateCleaner(logger, adminClient()).clearRollupMetadata(); + RollupRestTestStateCleaner.clearRollupMetadata(adminClient()); } } diff --git a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java index 5276abdbfb1d8..ba6f9e9167821 100644 --- a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java +++ b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.StreamsUtils; import org.elasticsearch.test.rest.ESRestTestCase; @@ -531,7 +532,10 @@ private void assertRollUpJob(final String rollupJob) throws Exception { // check that the rollup job is started using the RollUp API final Request getRollupJobRequest = new Request("GET", "_xpack/rollup/job/" + rollupJob); Map getRollupJobResponse = toMap(client().performRequest(getRollupJobRequest)); - assertThat(ObjectPath.eval("jobs.0.status.job_state", getRollupJobResponse), expectedStates); + Map job = getJob(getRollupJobResponse, rollupJob); + if (job != null) { + assertThat(ObjectPath.eval("status.job_state", job), expectedStates); + } // check that the rollup job is started using the Tasks API final Request taskRequest = new Request("GET", "_tasks"); @@ -547,15 +551,27 @@ private void assertRollUpJob(final String rollupJob) throws Exception { // check that the rollup job is started using the Cluster State API final Request clusterStateRequest = new Request("GET", "_cluster/state/metadata"); Map clusterStateResponse = toMap(client().performRequest(clusterStateRequest)); - Map rollupJobTask = ObjectPath.eval("metadata.persistent_tasks.tasks.0", clusterStateResponse); - assertThat(ObjectPath.eval("id", rollupJobTask), equalTo("rollup-job-test")); + List> rollupJobTasks = ObjectPath.eval("metadata.persistent_tasks.tasks", clusterStateResponse); + + boolean hasRollupTask = false; + for (Map task : rollupJobTasks) { + if (ObjectPath.eval("id", task).equals(rollupJob)) { + hasRollupTask = true; + + // Persistent task state field has been renamed in 6.4.0 from "status" to "state" + final String stateFieldName + = (runningAgainstOldCluster && oldClusterVersion.before(Version.V_6_4_0)) ? "status" : "state"; - // Persistent task state field has been renamed in 6.4.0 from "status" to "state" - final String stateFieldName = (runningAgainstOldCluster && oldClusterVersion.before(Version.V_6_4_0)) ? "status" : "state"; + final String jobStateField = "task.xpack/rollup/job." + stateFieldName + ".job_state"; + assertThat("Expected field [" + jobStateField + "] to be started or indexing in " + task.get("id"), + ObjectPath.eval(jobStateField, task), expectedStates); + break; + } + } + if (hasRollupTask == false) { + fail("Expected persistent task for [" + rollupJob + "] but none found."); + } - final String jobStateField = "task.xpack/rollup/job." + stateFieldName + ".job_state"; - assertThat("Expected field [" + jobStateField + "] to be started or indexing in " + rollupJobTask, - ObjectPath.eval(jobStateField, rollupJobTask), expectedStates); } private void waitForRollUpJob(final String rollupJob, final Matcher expectedStates) throws Exception { @@ -563,7 +579,34 @@ private void waitForRollUpJob(final String rollupJob, final Matcher expectedS final Request getRollupJobRequest = new Request("GET", "_xpack/rollup/job/" + rollupJob); Response getRollupJobResponse = client().performRequest(getRollupJobRequest); assertThat(getRollupJobResponse.getStatusLine().getStatusCode(), equalTo(RestStatus.OK.getStatus())); - assertThat(ObjectPath.eval("jobs.0.status.job_state", toMap(getRollupJobResponse)), expectedStates); + + Map job = getJob(getRollupJobResponse, rollupJob); + if (job != null) { + assertThat(ObjectPath.eval("status.job_state", job), expectedStates); + } }, 30L, TimeUnit.SECONDS); } + + private Map getJob(Response response, String targetJobId) throws IOException { + return getJob(ESRestTestCase.entityAsMap(response), targetJobId); + } + + @SuppressWarnings("unchecked") + private Map getJob(Map jobsMap, String targetJobId) throws IOException { + + List> jobs = + (List>) XContentMapValues.extractValue("jobs", jobsMap); + + if (jobs == null) { + return null; + } + + for (Map job : jobs) { + String jobId = (String) ((Map) job.get("config")).get("id"); + if (jobId.equals(targetJobId)) { + return job; + } + } + return null; + } }