Skip to content

[8.0] [Transform] Fix condition on which the transform stops processing buckets (#82852) #83136

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/82852.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 82852
summary: Fix condition on which the transform stops processing buckets
area: Transform
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void testSimplePivot() throws Exception {

public void testSimpleDataStreamPivot() throws Exception {
String indexName = "reviews_data_stream";
createReviewsIndex(indexName, 1000, "date", true, -1, null);
createReviewsIndex(indexName, 1000, 27, "date", true, -1, null);
String transformId = "simple_data_stream_pivot";
String transformIndex = "pivot_reviews_data_stream";
setupDataAccessRole(DATA_ACCESS_ROLE, indexName, transformIndex);
Expand Down Expand Up @@ -363,7 +363,7 @@ public void testBucketSelectorPivot() throws Exception {

public void testContinuousPivot() throws Exception {
String indexName = "continuous_reviews";
createReviewsIndex(indexName, 1000, "date", false, 5, "user_id");
createReviewsIndex(indexName, 1000, 27, "date", false, 5, "user_id");
String transformId = "simple_continuous_pivot";
String transformIndex = "pivot_reviews_continuous";
setupDataAccessRole(DATA_ACCESS_ROLE, indexName, transformIndex);
Expand Down Expand Up @@ -1283,7 +1283,7 @@ public void testPivotWithGeoBoundsAgg() throws Exception {
String indexName = "reviews_geo_bounds";

// gh#71874 regression test: create some sparse data
createReviewsIndex(indexName, 1000, "date", false, 5, "location");
createReviewsIndex(indexName, 1000, 27, "date", false, 5, "location");

setupDataAccessRole(DATA_ACCESS_ROLE, indexName, transformIndex);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

public class TransformPivotRestSpecialCasesIT extends TransformRestTestCase {
private static boolean indicesCreated = false;
Expand Down Expand Up @@ -235,4 +236,70 @@ public void testSparseDataPercentiles() throws Exception {
assertTrue(percentilesEmpty.containsKey("99"));
assertNull(percentilesEmpty.get("99"));
}

/**
* This test verifies that regardless of the max_page_search_size setting value used, the transform works correctly in the face of
* restrictive bucket selector.
* In the past there was a problem when there were no buckets (because bucket selector filtered them out) in a composite aggregation
* page and for small enough max_page_search_size the transform stopped prematurely.
* The problem was fixed by https://github.com/elastic/elasticsearch/pull/82852 and this test serves as a regression test for this PR.
*/
public void testRestrictiveBucketSelector() throws Exception {
String indexName = "special_pivot_bucket_selector_reviews";
createReviewsIndex(indexName, 1000, 327, "date", false, 5, "affiliate_id");

verifyDestIndexHitsCount(indexName, "special_pivot_bucket_selector-10", 10, 14);
verifyDestIndexHitsCount(indexName, "special_pivot_bucket_selector-10000", 10000, 14);
}

private void verifyDestIndexHitsCount(String sourceIndex, String transformId, int maxPageSearchSize, long expectedDestIndexCount)
throws Exception {
String transformIndex = transformId;
String config = """
{
"source": {
"index": "%s"
},
"dest": {
"index": "%s"
},
"frequency": "1m",
"pivot": {
"group_by": {
"user_id": {
"terms": {
"field": "user_id"
}
}
},
"aggregations": {
"stars_sum": {
"sum": {
"field": "stars"
}
},
"bs": {
"bucket_selector": {
"buckets_path": {
"stars_sum": "stars_sum.value"
},
"script": "params.stars_sum > 20"
}
}
}
},
"settings": {
"max_page_search_size": %s
}
}""".formatted(sourceIndex, transformIndex, maxPageSearchSize);
Request createTransformRequest = new Request("PUT", getTransformEndpoint() + transformId);
createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
startAndWaitForTransform(transformId, transformIndex);
assertTrue(indexExists(transformIndex));
Map<String, Object> searchResult = getAsMap(transformIndex + "/_search");
long count = (Integer) XContentMapValues.extractValue("hits.total.value", searchResult);
assertThat(count, is(equalTo(expectedDestIndexCount)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ protected Settings restClientSettings() {
protected void createReviewsIndex(
String indexName,
int numDocs,
int numUsers,
String dateType,
boolean isDataStream,
int userWithMissingBuckets,
Expand All @@ -73,7 +74,7 @@ protected void createReviewsIndex(
bulk.append("""
{"create":{"_index":"%s"}}
""".formatted(indexName));
long user = Math.round(Math.pow(i * 31 % 1000, distributionTable[i % distributionTable.length]) % 27);
long user = Math.round(Math.pow(i * 31 % 1000, distributionTable[i % distributionTable.length]) % numUsers);
int stars = distributionTable[(i * 33) % distributionTable.length];
long business = Math.round(Math.pow(user * stars, distributionTable[i % distributionTable.length]) % 13);
long affiliate = Math.round(Math.pow(user * stars, distributionTable[i % distributionTable.length]) % 11);
Expand Down Expand Up @@ -201,7 +202,7 @@ protected void createReviewsIndex() throws IOException {
}

protected void createReviewsIndex(String indexName) throws IOException {
createReviewsIndex(indexName, 1000, "date", false, 5, "affiliate_id");
createReviewsIndex(indexName, 1000, 27, "date", false, 5, "affiliate_id");
}

protected void createPivotReviewsTransform(String transformId, String transformIndex, String query) throws IOException {
Expand All @@ -214,7 +215,7 @@ protected void createPivotReviewsTransform(String transformId, String transformI
}

protected void createReviewsIndexNano() throws IOException {
createReviewsIndex(REVIEWS_DATE_NANO_INDEX_NAME, 1000, "date_nanos", false, -1, null);
createReviewsIndex(REVIEWS_DATE_NANO_INDEX_NAME, 1000, 27, "date_nanos", false, -1, null);
}

protected void createContinuousPivotReviewsTransform(String transformId, String transformIndex, String authHeader) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void cleanUpPotentiallyFailedTransform() throws Exception {

public void testForceStopFailedTransform() throws Exception {
String transformId = "test-force-stop-failed-transform";
createReviewsIndex(REVIEWS_INDEX_NAME, 10, "date", false, -1, null);
createReviewsIndex(REVIEWS_INDEX_NAME, 10, 27, "date", false, -1, null);
String transformIndex = "failure_pivot_reviews";
createDestinationIndexWithBadMapping(transformIndex);
createContinuousPivotReviewsTransform(transformId, transformIndex, null);
Expand Down Expand Up @@ -94,7 +94,7 @@ public void testForceStopFailedTransform() throws Exception {

public void testStartFailedTransform() throws Exception {
String transformId = "test-force-start-failed-transform";
createReviewsIndex(REVIEWS_INDEX_NAME, 10, "date", false, -1, null);
createReviewsIndex(REVIEWS_INDEX_NAME, 10, 27, "date", false, -1, null);
String transformIndex = "failure_pivot_reviews";
createDestinationIndexWithBadMapping(transformIndex);
createContinuousPivotReviewsTransform(transformId, transformIndex, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public Tuple<Stream<IndexRequest>, Map<String, Object>> processSearchResponse(
}

CompositeAggregation compositeAgg = aggregations.get(COMPOSITE_AGGREGATION_NAME);
if (compositeAgg == null || compositeAgg.getBuckets().isEmpty()) {
if (compositeAgg == null || compositeAgg.afterKey() == null) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpClient;
import org.elasticsearch.xcontent.DeprecationHandler;
Expand All @@ -34,12 +36,14 @@
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.core.transform.TransformDeprecations;
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfigTests;
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
import org.elasticsearch.xpack.core.transform.transforms.pivot.AggregationConfig;
import org.elasticsearch.xpack.core.transform.transforms.pivot.AggregationConfigTests;
import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig;
import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfigTests;
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfigTests;
import org.elasticsearch.xpack.spatial.SpatialPlugin;
import org.elasticsearch.xpack.transform.Transform;
import org.elasticsearch.xpack.transform.transforms.Function;
Expand All @@ -51,6 +55,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -63,7 +68,11 @@
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class PivotTests extends ESTestCase {

Expand Down Expand Up @@ -218,6 +227,47 @@ public void testGetPerformanceCriticalFields() throws IOException {
assertThat(pivot.getPerformanceCriticalFields(), contains("field-A", "field-B", "field-C"));
}

public void testProcessSearchResponse() {
Function pivot = new Pivot(
PivotConfigTests.randomPivotConfig(),
SettingsConfigTests.randomSettingsConfig(),
Version.CURRENT,
Collections.emptySet()
);

Aggregations aggs = null;
assertThat(pivot.processSearchResponse(searchResponseFromAggs(aggs), null, null, null, null, null), is(nullValue()));

aggs = new Aggregations(List.of());
assertThat(pivot.processSearchResponse(searchResponseFromAggs(aggs), null, null, null, null, null), is(nullValue()));

CompositeAggregation compositeAgg = mock(CompositeAggregation.class);
when(compositeAgg.getName()).thenReturn("_transform");
when(compositeAgg.getBuckets()).thenReturn(List.of());
when(compositeAgg.afterKey()).thenReturn(null);
aggs = new Aggregations(List.of(compositeAgg));
assertThat(pivot.processSearchResponse(searchResponseFromAggs(aggs), null, null, null, null, null), is(nullValue()));

when(compositeAgg.getBuckets()).thenReturn(List.of());
when(compositeAgg.afterKey()).thenReturn(Map.of("key", "value"));
aggs = new Aggregations(List.of(compositeAgg));
// Empty bucket list is *not* a stop condition for composite agg processing.
assertThat(pivot.processSearchResponse(searchResponseFromAggs(aggs), null, null, null, null, null), is(notNullValue()));

CompositeAggregation.Bucket bucket = mock(CompositeAggregation.Bucket.class);
List<? extends CompositeAggregation.Bucket> buckets = List.of(bucket);
doReturn(buckets).when(compositeAgg).getBuckets();
when(compositeAgg.afterKey()).thenReturn(null);
aggs = new Aggregations(List.of(compositeAgg));
assertThat(pivot.processSearchResponse(searchResponseFromAggs(aggs), null, null, null, null, null), is(nullValue()));
}

private static SearchResponse searchResponseFromAggs(Aggregations aggs) {
SearchResponseSections sections = new SearchResponseSections(null, aggs, null, false, null, null, 1);
SearchResponse searchResponse = new SearchResponse(sections, null, 10, 5, 0, 0, new ShardSearchFailure[0], null);
return searchResponse;
}

private class MyMockClient extends NoOpClient {
MyMockClient(String testName) {
super(testName);
Expand Down