Skip to content

Fix inner hits + aggregations concurrency bug #128036

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Jun 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/128036.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 128036
summary: Fix inner hits + aggregations concurrency bug
area: Search
type: bug
issues:
- 122419
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.TopHits;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.FieldSortBuilder;
Expand All @@ -51,6 +53,7 @@
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.elasticsearch.join.query.JoinQueryBuilders.hasChildQuery;
import static org.elasticsearch.join.query.JoinQueryBuilders.hasParentQuery;
import static org.elasticsearch.search.aggregations.AggregationBuilders.topHits;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCountAndNoFailures;
Expand All @@ -64,6 +67,7 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

Expand Down Expand Up @@ -698,4 +702,68 @@ public void testTooHighResultWindow() {
)
);
}

public void testTopHitsOnParentChild() throws Exception {
assertAcked(
prepareCreate("idx").setMapping(
jsonBuilder().startObject()
.startObject("_doc")
.startObject("properties")
.startObject("id")
.field("type", "keyword")
.endObject()
.startObject("join_field")
.field("type", "join")
.startObject("relations")
.field("parent", new String[] { "child1", "child2" })
.endObject()
.endObject()
.endObject()
.endObject()
.endObject()
)
);
ensureGreen("idx");

List<IndexRequestBuilder> requestBuilders = new ArrayList<>();
int numDocs = scaledRandomIntBetween(10, 100);
int child1 = 0;
int child2 = 0;
int[] child1InnerObjects = new int[numDocs];
int[] child2InnerObjects = new int[numDocs];
for (int parent = 0; parent < numDocs; parent++) {
String parentId = String.format(Locale.ENGLISH, "p_%03d", parent);
requestBuilders.add(createIndexRequest("idx", "parent", parentId, null));

int numChildDocs = child1InnerObjects[parent] = scaledRandomIntBetween(1, numDocs);
int limit = child1 + numChildDocs;
for (; child1 < limit; child1++) {
requestBuilders.add(createIndexRequest("idx", "child1", String.format(Locale.ENGLISH, "c1_%04d", child1), parentId));
}
numChildDocs = child2InnerObjects[parent] = scaledRandomIntBetween(1, numDocs);
limit = child2 + numChildDocs;
for (; child2 < limit; child2++) {
requestBuilders.add(createIndexRequest("idx", "child2", String.format(Locale.ENGLISH, "c2_%04d", child2), parentId));
}
}

indexRandom(true, requestBuilders);
ensureSearchable();

QueryBuilder hasChildQuery = hasChildQuery("child2", matchAllQuery(), ScoreMode.None).innerHit(new InnerHitBuilder().setSize(2));
AggregationBuilder topHitsAgg = topHits("top-children").size(3);

assertNoFailuresAndResponse(prepareSearch("idx").setQuery(hasChildQuery).addAggregation(topHitsAgg), response -> {
assertHitCount(response, numDocs);

TopHits topHits = response.getAggregations().get("top-children");
SearchHits hits = topHits.getHits();
assertThat(hits.getHits().length, equalTo(3));

for (SearchHit hit : hits) {
SearchHits innerHits = hit.getInnerHits().get("child2");
assertThat(innerHits.getHits().length, lessThanOrEqualTo(2));
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,35 @@ static final class JoinFieldInnerHitSubContext extends InnerHitsContext.InnerHit
private final String typeName;
private final boolean fetchChildInnerHits;
private final Joiner joiner;
private final SearchExecutionContext searchExecutionContext;

JoinFieldInnerHitSubContext(String name, SearchContext context, String typeName, boolean fetchChildInnerHits, Joiner joiner) {
super(name, context);
this.typeName = typeName;
this.fetchChildInnerHits = fetchChildInnerHits;
this.joiner = joiner;
this.searchExecutionContext = null;
}

JoinFieldInnerHitSubContext(
JoinFieldInnerHitSubContext joinFieldInnerHitSubContext,
SearchExecutionContext searchExecutionContext
) {
super(joinFieldInnerHitSubContext);
this.typeName = joinFieldInnerHitSubContext.typeName;
this.fetchChildInnerHits = joinFieldInnerHitSubContext.fetchChildInnerHits;
this.joiner = joinFieldInnerHitSubContext.joiner;
this.searchExecutionContext = searchExecutionContext;
}

@Override
public JoinFieldInnerHitSubContext copyWithSearchExecutionContext(SearchExecutionContext searchExecutionContext) {
return new JoinFieldInnerHitSubContext(this, searchExecutionContext);
}

@Override
public SearchExecutionContext getSearchExecutionContext() {
return searchExecutionContext != null ? searchExecutionContext : super.getSearchExecutionContext();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.fielddata.ScriptDocValues;
import org.elasticsearch.index.query.InnerHitBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.plugins.Plugin;
Expand All @@ -30,6 +32,7 @@
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
Expand All @@ -45,6 +48,7 @@
import org.elasticsearch.search.lookup.FieldLookup;
import org.elasticsearch.search.lookup.LeafSearchLookup;
import org.elasticsearch.search.rescore.QueryRescorerBuilder;
import org.elasticsearch.search.sort.NestedSortBuilder;
import org.elasticsearch.search.sort.ScriptSortBuilder.ScriptSortType;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
Expand Down Expand Up @@ -983,6 +987,67 @@ public void testTopHitsInNested() throws Exception {
);
}

public void testTopHitsOnInnerHits() {
QueryBuilder nestedQuery = nestedQuery("comments", matchQuery("comments.message", "text"), ScoreMode.Avg).innerHit(
new InnerHitBuilder().setSize(2)
);
AggregationBuilder topHitsAgg = topHits("top-comments").size(3)
.sort(SortBuilders.fieldSort("comments.date").order(SortOrder.ASC).setNestedSort(new NestedSortBuilder("comments")));

assertNoFailuresAndResponse(prepareSearch("articles").setQuery(nestedQuery).addAggregation(topHitsAgg), response -> {
TopHits topHits = response.getAggregations().get("top-comments");
SearchHits hits = topHits.getHits();
assertThat(hits.getHits().length, equalTo(3));

for (SearchHit hit : hits) {
SearchHits innerHits = hit.getInnerHits().get("comments");
assertThat(innerHits.getHits().length, lessThanOrEqualTo(2));
for (SearchHit innerHit : innerHits) {
assertThat(innerHit.getNestedIdentity().getField().string(), equalTo("comments"));
Map<String, Object> source = innerHit.getSourceAsMap();
assertTrue(source.containsKey("message"));
assertFalse(source.containsKey("reviewers"));
}
}
});
}

public void testTopHitsOnMultipleNestedInnerHits() {
QueryBuilder doubleNestedQuery = nestedQuery(
"comments",
nestedQuery("comments.reviewers", matchQuery("comments.reviewers.name", "user c"), ScoreMode.Avg).innerHit(
new InnerHitBuilder()
),
ScoreMode.Avg
).innerHit(new InnerHitBuilder("review"));
AggregationBuilder topHitsAgg = topHits("top-reviewers").size(2)
.sort(SortBuilders.fieldSort("comments.date").order(SortOrder.ASC).setNestedSort(new NestedSortBuilder("comments")));

assertNoFailuresAndResponse(prepareSearch("articles").setQuery(doubleNestedQuery).addAggregation(topHitsAgg), response -> {
TopHits topHits = response.getAggregations().get("top-reviewers");
SearchHits hits = topHits.getHits();
assertThat(hits.getHits().length, equalTo(1));

SearchHit hit = hits.getAt(0);
SearchHits innerHits = hit.getInnerHits().get("review");
assertThat(innerHits.getHits().length, equalTo(2));

assertThat(innerHits.getAt(0).getId(), equalTo("1"));
assertThat(innerHits.getAt(0).getNestedIdentity().getField().string(), equalTo("comments"));
assertThat(innerHits.getAt(0).getNestedIdentity().getOffset(), equalTo(0));
Map<String, Object> source0 = innerHits.getAt(0).getSourceAsMap();
assertTrue(source0.containsKey("message"));
assertTrue(source0.containsKey("reviewers"));

assertThat(innerHits.getAt(1).getId(), equalTo("1"));
assertThat(innerHits.getAt(1).getNestedIdentity().getField().string(), equalTo("comments"));
assertThat(innerHits.getAt(1).getNestedIdentity().getOffset(), equalTo(1));
Map<String, Object> source1 = innerHits.getAt(1).getSourceAsMap();
assertTrue(source1.containsKey("message"));
assertTrue(source1.containsKey("reviewers"));
});
}

public void testUseMaxDocInsteadOfSize() throws Exception {
updateIndexSettings(
Settings.builder().put(IndexSettings.MAX_INNER_RESULT_WINDOW_SETTING.getKey(), ArrayUtil.MAX_ARRAY_LENGTH),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ static final class NestedInnerHitSubContext extends InnerHitsContext.InnerHitSub

private final NestedObjectMapper parentObjectMapper;
private final NestedObjectMapper childObjectMapper;
private final SearchExecutionContext searchExecutionContext;

NestedInnerHitSubContext(
String name,
Expand All @@ -406,6 +407,24 @@ static final class NestedInnerHitSubContext extends InnerHitsContext.InnerHitSub
super(name, context);
this.parentObjectMapper = parentObjectMapper;
this.childObjectMapper = childObjectMapper;
this.searchExecutionContext = null;
}

NestedInnerHitSubContext(NestedInnerHitSubContext nestedInnerHitSubContext, SearchExecutionContext searchExecutionContext) {
super(nestedInnerHitSubContext);
this.parentObjectMapper = nestedInnerHitSubContext.parentObjectMapper;
this.childObjectMapper = nestedInnerHitSubContext.childObjectMapper;
this.searchExecutionContext = searchExecutionContext;
}

@Override
public NestedInnerHitSubContext copyWithSearchExecutionContext(SearchExecutionContext searchExecutionContext) {
return new NestedInnerHitSubContext(this, searchExecutionContext);
}

@Override
public SearchExecutionContext getSearchExecutionContext() {
return searchExecutionContext != null ? searchExecutionContext : super.getSearchExecutionContext();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.subphase.InnerHitsContext;
import org.elasticsearch.search.fetch.subphase.InnerHitsContext.InnerHitSubContext;
import org.elasticsearch.search.internal.SubSearchContext;
import org.elasticsearch.search.profile.ProfileResult;
import org.elasticsearch.search.rescore.RescoreContext;
Expand Down Expand Up @@ -223,16 +225,40 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE
private static FetchSearchResult runFetchPhase(SubSearchContext subSearchContext, int[] docIdsToLoad) {
// Fork the search execution context for each slice, because the fetch phase does not support concurrent execution yet.
SearchExecutionContext searchExecutionContext = new SearchExecutionContext(subSearchContext.getSearchExecutionContext());
// InnerHitSubContext is not thread-safe, so we fork it as well to support concurrent execution
InnerHitsContext innerHitsContext = new InnerHitsContext(
getForkedInnerHits(subSearchContext.innerHits().getInnerHits(), searchExecutionContext)
);

SubSearchContext fetchSubSearchContext = new SubSearchContext(subSearchContext) {
@Override
public SearchExecutionContext getSearchExecutionContext() {
return searchExecutionContext;
}

@Override
public InnerHitsContext innerHits() {
return innerHitsContext;
}
};

fetchSubSearchContext.fetchPhase().execute(fetchSubSearchContext, docIdsToLoad, null);
return fetchSubSearchContext.fetchResult();
}

private static Map<String, InnerHitSubContext> getForkedInnerHits(
Map<String, InnerHitSubContext> originalInnerHits,
SearchExecutionContext searchExecutionContext
) {
Map<String, InnerHitSubContext> forkedInnerHits = new HashMap<>();
for (Map.Entry<String, InnerHitSubContext> entry : originalInnerHits.entrySet()) {
var forkedContext = entry.getValue().copyWithSearchExecutionContext(searchExecutionContext);
forkedInnerHits.put(entry.getKey(), forkedContext);
}

return forkedInnerHits;
}

@Override
public InternalTopHits buildEmptyAggregation() {
TopDocs topDocs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.Bits;
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.SubSearchContext;
Expand All @@ -44,7 +45,7 @@ public InnerHitsContext() {
this.innerHits = new HashMap<>();
}

InnerHitsContext(Map<String, InnerHitSubContext> innerHits) {
public InnerHitsContext(Map<String, InnerHitSubContext> innerHits) {
this.innerHits = Objects.requireNonNull(innerHits);
}

Expand Down Expand Up @@ -84,6 +85,14 @@ protected InnerHitSubContext(String name, SearchContext context) {
this.context = context;
}

public InnerHitSubContext(InnerHitSubContext innerHitSubContext) {
super(innerHitSubContext);
this.name = innerHitSubContext.name;
this.context = innerHitSubContext.context;
}

public abstract InnerHitSubContext copyWithSearchExecutionContext(SearchExecutionContext searchExecutionContext);

public abstract TopDocsAndMaxScore topDocs(SearchHit hit) throws IOException;

public String getName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.search.fetch.subphase.FetchDocValuesPhase;
import org.elasticsearch.search.fetch.subphase.FetchSourcePhase;
import org.elasticsearch.search.fetch.subphase.InnerHitsContext;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ContextIndexSearcher;
import org.elasticsearch.search.internal.SearchContext;
Expand Down Expand Up @@ -525,6 +526,7 @@ private SubSearchContext buildSubSearchContext(
when(ctx.indexShard()).thenReturn(indexShard);
when(ctx.newSourceLoader()).thenAnswer(inv -> searchExecutionContext.newSourceLoader(false));
when(ctx.newIdLoader()).thenReturn(IdLoader.fromLeafStoredFieldLoader());
when(ctx.innerHits()).thenReturn(new InnerHitsContext());
var res = new SubSearchContext(ctx);
releasables.add(res); // TODO: nasty workaround for not getting the standard resource handling behavior of a real search context
return res;
Expand Down
Loading