Skip to content

Commit c3b3dd8

Browse files
authored
Address concurrency issue in top hits aggregation (#106990)
Top hits aggregation runs the fetch phase concurrently when the query phase is executed across multiple slices. This is problematic as the fetch phase does not support concurrent execution yet. The core of the issue is that the search execution context is shared across slices, which call setLookupProviders against it concurrently, setting each time different instances of preloaded source and field lookup providers. This makes us cross streams between slices, and hit lucene assertions that ensure that stored fields loaded from a certain thread are not read from a different thread. We have not hit this before because the problem revolves around SearchLookup which is used by runtime fields. TopHitsIT is the main test we have for top hits agg, but it uses a mock script engine which bypasses painless and SearchLookup.
1 parent a28a31a commit c3b3dd8

File tree

9 files changed

+184
-115
lines changed

9 files changed

+184
-115
lines changed

docs/changelog/106990.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 106990
2+
summary: Address concurrency issue in top hits aggregation
3+
area: Aggregations
4+
type: bug
5+
issues: []

server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/terms/RareTermsIT.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,22 @@
1212
import org.elasticsearch.action.index.IndexRequest;
1313
import org.elasticsearch.cluster.metadata.IndexMetadata;
1414
import org.elasticsearch.common.settings.Settings;
15+
import org.elasticsearch.search.SearchHit;
16+
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
17+
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
18+
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
19+
import org.elasticsearch.search.aggregations.bucket.global.InternalGlobal;
20+
import org.elasticsearch.search.aggregations.metrics.InternalTopHits;
21+
import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
1522
import org.elasticsearch.test.ESSingleNodeTestCase;
1623
import org.elasticsearch.xcontent.XContentType;
1724
import org.hamcrest.Matchers;
1825

26+
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
1927
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
2028
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailuresAndResponse;
29+
import static org.hamcrest.Matchers.equalTo;
30+
import static org.hamcrest.Matchers.greaterThan;
2131

2232
/**
2333
* Test that index enough data to trigger the creation of Cuckoo filters.
@@ -64,4 +74,33 @@ private void assertNumRareTerms(int maxDocs, int rareTerms) {
6474
}
6575
);
6676
}
77+
78+
public void testGlobalAggregationWithScore() {
79+
createIndex("global", Settings.EMPTY, "_doc", "keyword", "type=keyword");
80+
prepareIndex("global").setSource("keyword", "a").setRefreshPolicy(IMMEDIATE).get();
81+
prepareIndex("global").setSource("keyword", "c").setRefreshPolicy(IMMEDIATE).get();
82+
prepareIndex("global").setSource("keyword", "e").setRefreshPolicy(IMMEDIATE).get();
83+
GlobalAggregationBuilder globalBuilder = new GlobalAggregationBuilder("global").subAggregation(
84+
new RareTermsAggregationBuilder("terms").field("keyword")
85+
.subAggregation(
86+
new RareTermsAggregationBuilder("sub_terms").field("keyword")
87+
.subAggregation(new TopHitsAggregationBuilder("top_hits").storedField("_none_"))
88+
)
89+
);
90+
assertNoFailuresAndResponse(client().prepareSearch("global").addAggregation(globalBuilder), response -> {
91+
InternalGlobal result = response.getAggregations().get("global");
92+
InternalMultiBucketAggregation<?, ?> terms = result.getAggregations().get("terms");
93+
assertThat(terms.getBuckets().size(), equalTo(3));
94+
for (MultiBucketsAggregation.Bucket bucket : terms.getBuckets()) {
95+
InternalMultiBucketAggregation<?, ?> subTerms = bucket.getAggregations().get("sub_terms");
96+
assertThat(subTerms.getBuckets().size(), equalTo(1));
97+
MultiBucketsAggregation.Bucket subBucket = subTerms.getBuckets().get(0);
98+
InternalTopHits topHits = subBucket.getAggregations().get("top_hits");
99+
assertThat(topHits.getHits().getHits().length, equalTo(1));
100+
for (SearchHit hit : topHits.getHits()) {
101+
assertThat(hit.getScore(), greaterThan(0f));
102+
}
103+
}
104+
});
105+
}
67106
}

server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsIT.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,24 @@
1818
import org.elasticsearch.plugins.Plugin;
1919
import org.elasticsearch.script.Script;
2020
import org.elasticsearch.script.ScriptType;
21+
import org.elasticsearch.search.SearchHit;
2122
import org.elasticsearch.search.aggregations.AggregationExecutionException;
2223
import org.elasticsearch.search.aggregations.AggregationTestScriptsPlugin;
24+
import org.elasticsearch.search.aggregations.Aggregator;
2325
import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
2426
import org.elasticsearch.search.aggregations.BucketOrder;
27+
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
2528
import org.elasticsearch.search.aggregations.bucket.AbstractTermsTestCase;
29+
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
2630
import org.elasticsearch.search.aggregations.bucket.filter.Filter;
31+
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
32+
import org.elasticsearch.search.aggregations.bucket.global.InternalGlobal;
2733
import org.elasticsearch.search.aggregations.metrics.Avg;
2834
import org.elasticsearch.search.aggregations.metrics.ExtendedStats;
35+
import org.elasticsearch.search.aggregations.metrics.InternalTopHits;
2936
import org.elasticsearch.search.aggregations.metrics.Stats;
3037
import org.elasticsearch.search.aggregations.metrics.Sum;
38+
import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder;
3139
import org.elasticsearch.search.aggregations.support.ValueType;
3240
import org.elasticsearch.search.builder.SearchSourceBuilder;
3341
import org.elasticsearch.test.ESIntegTestCase;
@@ -63,6 +71,7 @@
6371
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
6472
import static org.hamcrest.Matchers.containsString;
6573
import static org.hamcrest.Matchers.equalTo;
74+
import static org.hamcrest.Matchers.greaterThan;
6675
import static org.hamcrest.Matchers.instanceOf;
6776
import static org.hamcrest.Matchers.startsWith;
6877
import static org.hamcrest.core.IsNull.notNullValue;
@@ -1376,4 +1385,46 @@ private void assertOrderByKeyResponse(
13761385
}
13771386
);
13781387
}
1388+
1389+
public void testGlobalAggregationWithScore() throws Exception {
1390+
assertAcked(prepareCreate("global").setMapping("keyword", "type=keyword"));
1391+
indexRandom(
1392+
true,
1393+
prepareIndex("global").setSource("keyword", "a"),
1394+
prepareIndex("global").setSource("keyword", "c"),
1395+
prepareIndex("global").setSource("keyword", "e")
1396+
);
1397+
String executionHint = randomFrom(TermsAggregatorFactory.ExecutionMode.values()).toString();
1398+
Aggregator.SubAggCollectionMode collectionMode = randomFrom(Aggregator.SubAggCollectionMode.values());
1399+
GlobalAggregationBuilder globalBuilder = new GlobalAggregationBuilder("global").subAggregation(
1400+
new TermsAggregationBuilder("terms").userValueTypeHint(ValueType.STRING)
1401+
.executionHint(executionHint)
1402+
.collectMode(collectionMode)
1403+
.field("keyword")
1404+
.order(BucketOrder.key(true))
1405+
.subAggregation(
1406+
new TermsAggregationBuilder("sub_terms").userValueTypeHint(ValueType.STRING)
1407+
.executionHint(executionHint)
1408+
.collectMode(collectionMode)
1409+
.field("keyword")
1410+
.order(BucketOrder.key(true))
1411+
.subAggregation(new TopHitsAggregationBuilder("top_hits").storedField("_none_"))
1412+
)
1413+
);
1414+
assertNoFailuresAndResponse(prepareSearch("global").addAggregation(globalBuilder), response -> {
1415+
InternalGlobal result = response.getAggregations().get("global");
1416+
InternalMultiBucketAggregation<?, ?> terms = result.getAggregations().get("terms");
1417+
assertThat(terms.getBuckets().size(), equalTo(3));
1418+
for (MultiBucketsAggregation.Bucket bucket : terms.getBuckets()) {
1419+
InternalMultiBucketAggregation<?, ?> subTerms = bucket.getAggregations().get("sub_terms");
1420+
assertThat(subTerms.getBuckets().size(), equalTo(1));
1421+
MultiBucketsAggregation.Bucket subBucket = subTerms.getBuckets().get(0);
1422+
InternalTopHits topHits = subBucket.getAggregations().get("top_hits");
1423+
assertThat(topHits.getHits().getHits().length, equalTo(1));
1424+
for (SearchHit hit : topHits.getHits()) {
1425+
assertThat(hit.getScore(), greaterThan(0f));
1426+
}
1427+
}
1428+
});
1429+
}
13791430
}

server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/metrics/TopHitsIT.java

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
*/
88
package org.elasticsearch.search.aggregations.metrics;
99

10+
import org.apache.lucene.index.LeafReaderContext;
1011
import org.apache.lucene.search.Explanation;
1112
import org.apache.lucene.search.join.ScoreMode;
1213
import org.apache.lucene.util.ArrayUtil;
@@ -20,6 +21,7 @@
2021
import org.elasticsearch.index.query.QueryBuilders;
2122
import org.elasticsearch.index.seqno.SequenceNumbers;
2223
import org.elasticsearch.plugins.Plugin;
24+
import org.elasticsearch.plugins.SearchPlugin;
2325
import org.elasticsearch.script.MockScriptEngine;
2426
import org.elasticsearch.script.MockScriptPlugin;
2527
import org.elasticsearch.script.Script;
@@ -34,15 +36,21 @@
3436
import org.elasticsearch.search.aggregations.bucket.nested.Nested;
3537
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
3638
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory.ExecutionMode;
39+
import org.elasticsearch.search.fetch.FetchSubPhase;
40+
import org.elasticsearch.search.fetch.FetchSubPhaseProcessor;
41+
import org.elasticsearch.search.fetch.StoredFieldsSpec;
3742
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
3843
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
44+
import org.elasticsearch.search.lookup.FieldLookup;
45+
import org.elasticsearch.search.lookup.LeafSearchLookup;
3946
import org.elasticsearch.search.rescore.QueryRescorerBuilder;
4047
import org.elasticsearch.search.sort.ScriptSortBuilder.ScriptSortType;
4148
import org.elasticsearch.search.sort.SortBuilders;
4249
import org.elasticsearch.search.sort.SortOrder;
4350
import org.elasticsearch.test.ESIntegTestCase;
4451
import org.elasticsearch.xcontent.XContentBuilder;
4552

53+
import java.io.IOException;
4654
import java.util.ArrayList;
4755
import java.util.Collection;
4856
import java.util.Collections;
@@ -87,7 +95,7 @@ public class TopHitsIT extends ESIntegTestCase {
8795

8896
@Override
8997
protected Collection<Class<? extends Plugin>> nodePlugins() {
90-
return Collections.singleton(CustomScriptPlugin.class);
98+
return List.of(CustomScriptPlugin.class, FetchPlugin.class);
9199
}
92100

93101
public static class CustomScriptPlugin extends MockScriptPlugin {
@@ -110,7 +118,7 @@ public static String randomExecutionHint() {
110118

111119
@Override
112120
public void setupSuiteScopeCluster() throws Exception {
113-
assertAcked(prepareCreate("idx").setMapping(TERMS_AGGS_FIELD, "type=keyword"));
121+
assertAcked(prepareCreate("idx").setMapping(TERMS_AGGS_FIELD, "type=keyword", "text", "type=text,store=true"));
114122
assertAcked(prepareCreate("field-collapsing").setMapping("group", "type=keyword"));
115123
createIndex("empty");
116124
assertAcked(
@@ -592,7 +600,7 @@ public void testFieldCollapsing() throws Exception {
592600
);
593601
}
594602

595-
public void testFetchFeatures() {
603+
public void testFetchFeatures() throws IOException {
596604
final boolean seqNoAndTerm = randomBoolean();
597605
assertNoFailuresAndResponse(
598606
prepareSearch("idx").setQuery(matchQuery("text", "text").queryName("test"))
@@ -642,19 +650,14 @@ public void testFetchFeatures() {
642650

643651
assertThat(hit.getMatchedQueries()[0], equalTo("test"));
644652

645-
DocumentField field1 = hit.field("field1");
646-
assertThat(field1.getValue(), equalTo(5L));
647-
648-
DocumentField field2 = hit.field("field2");
649-
assertThat(field2.getValue(), equalTo(2.71f));
650-
651-
assertThat(hit.getSourceAsMap().get("text").toString(), equalTo("some text to entertain"));
652-
653-
field2 = hit.field("script");
654-
assertThat(field2.getValue().toString(), equalTo("5"));
653+
assertThat(hit.field("field1").getValue(), equalTo(5L));
654+
assertThat(hit.field("field2").getValue(), equalTo(2.71f));
655+
assertThat(hit.field("script").getValue().toString(), equalTo("5"));
655656

656657
assertThat(hit.getSourceAsMap().size(), equalTo(1));
657658
assertThat(hit.getSourceAsMap().get("text").toString(), equalTo("some text to entertain"));
659+
assertEquals("some text to entertain", hit.getFields().get("text").getValue());
660+
assertEquals("some text to entertain", hit.getFields().get("text_stored_lookup").getValue());
658661
}
659662
}
660663
);
@@ -1263,4 +1266,37 @@ public void testWithRescore() {
12631266
}
12641267
);
12651268
}
1269+
1270+
public static class FetchPlugin extends Plugin implements SearchPlugin {
1271+
@Override
1272+
public List<FetchSubPhase> getFetchSubPhases(FetchPhaseConstructionContext context) {
1273+
return Collections.singletonList(fetchContext -> {
1274+
if (fetchContext.getIndexName().equals("idx")) {
1275+
return new FetchSubPhaseProcessor() {
1276+
1277+
private LeafSearchLookup leafSearchLookup;
1278+
1279+
@Override
1280+
public void setNextReader(LeafReaderContext ctx) {
1281+
leafSearchLookup = fetchContext.getSearchExecutionContext().lookup().getLeafSearchLookup(ctx);
1282+
}
1283+
1284+
@Override
1285+
public void process(FetchSubPhase.HitContext hitContext) {
1286+
leafSearchLookup.setDocument(hitContext.docId());
1287+
FieldLookup fieldLookup = leafSearchLookup.fields().get("text");
1288+
hitContext.hit()
1289+
.setDocumentField("text_stored_lookup", new DocumentField("text_stored_lookup", fieldLookup.getValues()));
1290+
}
1291+
1292+
@Override
1293+
public StoredFieldsSpec storedFieldsSpec() {
1294+
return StoredFieldsSpec.NO_REQUIREMENTS;
1295+
}
1296+
};
1297+
}
1298+
return null;
1299+
});
1300+
}
1301+
}
12661302
}

server/src/main/java/org/elasticsearch/search/aggregations/metrics/TopHitsAggregator.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.elasticsearch.common.util.LongObjectPagedHashMap;
3030
import org.elasticsearch.common.util.LongObjectPagedHashMap.Cursor;
3131
import org.elasticsearch.core.Releasables;
32+
import org.elasticsearch.index.query.SearchExecutionContext;
3233
import org.elasticsearch.search.SearchHit;
3334
import org.elasticsearch.search.SearchHits;
3435
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
@@ -191,8 +192,7 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE
191192
for (int i = 0; i < topDocs.scoreDocs.length; i++) {
192193
docIdsToLoad[i] = topDocs.scoreDocs[i].doc;
193194
}
194-
subSearchContext.fetchPhase().execute(subSearchContext, docIdsToLoad);
195-
FetchSearchResult fetchResult = subSearchContext.fetchResult();
195+
FetchSearchResult fetchResult = runFetchPhase(subSearchContext, docIdsToLoad);
196196
if (fetchProfiles != null) {
197197
fetchProfiles.add(fetchResult.profileResult());
198198
}
@@ -216,6 +216,19 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE
216216
);
217217
}
218218

219+
private static FetchSearchResult runFetchPhase(SubSearchContext subSearchContext, int[] docIdsToLoad) {
220+
// Fork the search execution context for each slice, because the fetch phase does not support concurrent execution yet.
221+
SearchExecutionContext searchExecutionContext = new SearchExecutionContext(subSearchContext.getSearchExecutionContext());
222+
SubSearchContext fetchSubSearchContext = new SubSearchContext(subSearchContext) {
223+
@Override
224+
public SearchExecutionContext getSearchExecutionContext() {
225+
return searchExecutionContext;
226+
}
227+
};
228+
fetchSubSearchContext.fetchPhase().execute(fetchSubSearchContext, docIdsToLoad);
229+
return fetchSubSearchContext.fetchResult();
230+
}
231+
219232
@Override
220233
public InternalTopHits buildEmptyAggregation() {
221234
TopDocs topDocs;

server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,11 @@ private SearchHits buildSearchHits(SearchContext context, int[] docIdsToLoad, Pr
104104

105105
PreloadedSourceProvider sourceProvider = new PreloadedSourceProvider();
106106
PreloadedFieldLookupProvider fieldLookupProvider = new PreloadedFieldLookupProvider();
107+
// The following relies on the fact that we fetch sequentially one segment after another, from a single thread
108+
// This needs to be revised once we add concurrency to the fetch phase, and needs a work-around for situations
109+
// where we run fetch as part of the query phase, where inter-segment concurrency is leveraged.
110+
// One problem is the global setLookupProviders call against the shared execution context.
111+
// Another problem is that the above provider implementations are not thread-safe
107112
context.getSearchExecutionContext().setLookupProviders(sourceProvider, ctx -> fieldLookupProvider);
108113

109114
List<FetchSubPhaseProcessor> processors = getProcessors(context.shardTarget(), fetchContext, profiler);

server/src/main/java/org/elasticsearch/search/internal/SubSearchContext.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
public class SubSearchContext extends FilteredSearchContext {
3030

3131
// By default return 3 hits per bucket. A higher default would make the response really large by default, since
32-
// the to hits are returned per bucket.
32+
// the top hits are returned per bucket.
3333
private static final int DEFAULT_SIZE = 3;
3434

3535
private int from;
@@ -62,6 +62,25 @@ public SubSearchContext(SearchContext context) {
6262
this.querySearchResult = new QuerySearchResult();
6363
}
6464

65+
public SubSearchContext(SubSearchContext subSearchContext) {
66+
this((SearchContext) subSearchContext);
67+
this.from = subSearchContext.from;
68+
this.size = subSearchContext.size;
69+
this.sort = subSearchContext.sort;
70+
this.parsedQuery = subSearchContext.parsedQuery;
71+
this.query = subSearchContext.query;
72+
this.storedFields = subSearchContext.storedFields;
73+
this.scriptFields = subSearchContext.scriptFields;
74+
this.fetchSourceContext = subSearchContext.fetchSourceContext;
75+
this.docValuesContext = subSearchContext.docValuesContext;
76+
this.fetchFieldsContext = subSearchContext.fetchFieldsContext;
77+
this.highlight = subSearchContext.highlight;
78+
this.explain = subSearchContext.explain;
79+
this.trackScores = subSearchContext.trackScores;
80+
this.version = subSearchContext.version;
81+
this.seqNoAndPrimaryTerm = subSearchContext.seqNoAndPrimaryTerm;
82+
}
83+
6584
@Override
6685
public void preProcess() {}
6786

0 commit comments

Comments
 (0)