Skip to content

NPE on batched query execution when the request is part of PIT with alias filters #128552

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Jun 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/128552.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 128552
summary: Fix - NPE on batched query execution when the request is part of PIT with alias filters
area: Search
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.IndicesOptions;
Expand Down Expand Up @@ -126,6 +127,42 @@ public void testBasic() {
}
}

public void testIndexWithFilteredAlias() {
String indexName = "index_1";
String alias = "alias_1";
assertAcked(
indicesAdmin().prepareCreate(indexName)
.setSettings(indexSettings(10, 0))
.addAlias(new Alias(alias).filter("{\"term\":{\"tag\":\"a\"}}"))
);

int numDocs = randomIntBetween(50, 150);
int countTagA = 0;
for (int i = 0; i < numDocs; i++) {
boolean isA = randomBoolean();
if (isA) countTagA++;
prepareIndex(indexName).setId(Integer.toString(i)).setSource("tag", isA ? "a" : "b").get();
}

refresh(indexName);
BytesReference pitId = openPointInTime(new String[] { alias }, TimeValue.timeValueMinutes(1)).getPointInTimeId();

try {
int finalCountTagA = countTagA;
assertResponse(
prepareSearch().setPointInTime(new PointInTimeBuilder(pitId).setKeepAlive(TimeValue.timeValueMinutes(1)))
.setSize(0)
.setQuery(new MatchAllQueryBuilder()),
resp1 -> {
assertThat(resp1.pointInTimeId(), equalTo(pitId));
assertHitCount(resp1, finalCountTagA);
}
);
} finally {
closePointInTime(pitId);
}
}

public void testMultipleIndices() {
int numIndices = randomIntBetween(1, 5);
for (int i = 1; i <= numIndices; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,10 +391,14 @@ private static ShardSearchRequest tryRewriteWithUpdatedSortValue(
return request;
}

private static boolean isPartOfPIT(SearchRequest request, ShardSearchContextId contextId) {
private static boolean isPartOfPIT(
SearchRequest request,
ShardSearchContextId contextId,
NamedWriteableRegistry namedWriteableRegistry
) {
final PointInTimeBuilder pointInTimeBuilder = request.pointInTimeBuilder();
if (pointInTimeBuilder != null) {
return request.pointInTimeBuilder().getSearchContextId(null).contains(contextId);
return request.pointInTimeBuilder().getSearchContextId(namedWriteableRegistry).contains(contextId);
} else {
return false;
}
Expand Down Expand Up @@ -546,7 +550,8 @@ private void onNodeQueryFailure(Exception e, NodeQueryRequest request, CanMatchP
static void registerNodeSearchAction(
SearchTransportService searchTransportService,
SearchService searchService,
SearchPhaseController searchPhaseController
SearchPhaseController searchPhaseController,
NamedWriteableRegistry namedWriteableRegistry
) {
var transportService = searchTransportService.transportService();
var threadPool = transportService.getThreadPool();
Expand Down Expand Up @@ -576,7 +581,8 @@ static void registerNodeSearchAction(
request,
cancellableTask,
channel,
dependencies
dependencies,
namedWriteableRegistry
);
// TODO: log activating or otherwise limiting parallelism might be helpful here
for (int i = 0; i < workers; i++) {
Expand All @@ -587,12 +593,17 @@ static void registerNodeSearchAction(
TransportActionProxy.registerProxyAction(transportService, NODE_SEARCH_ACTION_NAME, true, NodeQueryResponse::new);
}

private static void releaseLocalContext(SearchService searchService, NodeQueryRequest request, SearchPhaseResult result) {
private static void releaseLocalContext(
SearchService searchService,
NodeQueryRequest request,
SearchPhaseResult result,
NamedWriteableRegistry namedWriteableRegistry
) {
var phaseResult = result.queryResult() != null ? result.queryResult() : result.rankFeatureResult();
if (phaseResult != null
&& phaseResult.hasSearchContext()
&& request.searchRequest.scroll() == null
&& isPartOfPIT(request.searchRequest, phaseResult.getContextId()) == false) {
&& isPartOfPIT(request.searchRequest, phaseResult.getContextId(), namedWriteableRegistry) == false) {
searchService.freeReaderContext(phaseResult.getContextId());
}
}
Expand Down Expand Up @@ -736,13 +747,15 @@ private static final class QueryPerNodeState {
private final CountDown countDown;
private final TransportChannel channel;
private volatile BottomSortValuesCollector bottomSortCollector;
private final NamedWriteableRegistry namedWriteableRegistry;

private QueryPerNodeState(
QueryPhaseResultConsumer queryPhaseResultConsumer,
NodeQueryRequest searchRequest,
CancellableTask task,
TransportChannel channel,
Dependencies dependencies
Dependencies dependencies,
NamedWriteableRegistry namedWriteableRegistry
) {
this.queryPhaseResultConsumer = queryPhaseResultConsumer;
this.searchRequest = searchRequest;
Expand All @@ -752,6 +765,7 @@ private QueryPerNodeState(
this.countDown = new CountDown(queryPhaseResultConsumer.getNumShards());
this.channel = channel;
this.dependencies = dependencies;
this.namedWriteableRegistry = namedWriteableRegistry;
}

void onShardDone() {
Expand All @@ -762,7 +776,7 @@ void onShardDone() {
try (queryPhaseResultConsumer) {
var failure = queryPhaseResultConsumer.failure.get();
if (failure != null) {
handleMergeFailure(failure, channelListener);
handleMergeFailure(failure, channelListener, namedWriteableRegistry);
return;
}
final QueryPhaseResultConsumer.MergeResult mergeResult;
Expand All @@ -772,7 +786,7 @@ void onShardDone() {
EMPTY_PARTIAL_MERGE_RESULT
);
} catch (Exception e) {
handleMergeFailure(e, channelListener);
handleMergeFailure(e, channelListener, namedWriteableRegistry);
return;
}
// translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments,
Expand All @@ -799,7 +813,7 @@ void onShardDone() {
&& q.hasSuggestHits() == false
&& q.getRankShardResult() == null
&& searchRequest.searchRequest.scroll() == null
&& isPartOfPIT(searchRequest.searchRequest, q.getContextId()) == false) {
&& isPartOfPIT(searchRequest.searchRequest, q.getContextId(), namedWriteableRegistry) == false) {
if (dependencies.searchService.freeReaderContext(q.getContextId())) {
q.clearContextId();
}
Expand All @@ -816,9 +830,20 @@ && isPartOfPIT(searchRequest.searchRequest, q.getContextId()) == false) {
}
}

private void handleMergeFailure(Exception e, ChannelActionListener<TransportResponse> channelListener) {
private void handleMergeFailure(
Exception e,
ChannelActionListener<TransportResponse> channelListener,
NamedWriteableRegistry namedWriteableRegistry
) {
queryPhaseResultConsumer.getSuccessfulResults()
.forEach(searchPhaseResult -> releaseLocalContext(dependencies.searchService, searchRequest, searchPhaseResult));
.forEach(
searchPhaseResult -> releaseLocalContext(
dependencies.searchService,
searchRequest,
searchPhaseResult,
namedWriteableRegistry
)
);
channelListener.onFailure(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,12 @@ public TransportSearchAction(
this.searchTransportService = searchTransportService;
this.remoteClusterService = searchTransportService.getRemoteClusterService();
SearchTransportService.registerRequestHandler(transportService, searchService);
SearchQueryThenFetchAsyncAction.registerNodeSearchAction(searchTransportService, searchService, searchPhaseController);
SearchQueryThenFetchAsyncAction.registerNodeSearchAction(
searchTransportService,
searchService,
searchPhaseController,
namedWriteableRegistry
);
this.clusterService = clusterService;
this.transportService = transportService;
this.searchService = searchService;
Expand Down