Skip to content

Commit 496bb9e

Browse files
authored
Add a listener to track the progress of a search request locally (#49471) (#49691)
This commit adds a function in NodeClient that allows to track the progress of a search request locally. Progress is tracked through a SearchProgressListener that exposes query and fetch responses as well as partial and final reduces. This new method can be used by modules/plugins inside a node in order to track the progress of a local search request. Relates #49091
1 parent 2dafecc commit 496bb9e

39 files changed

+923
-393
lines changed

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.common.Nullable;
3535
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
3636
import org.elasticsearch.common.util.concurrent.AtomicArray;
37+
import org.elasticsearch.index.shard.ShardId;
3738
import org.elasticsearch.search.SearchPhaseResult;
3839
import org.elasticsearch.search.SearchShardTarget;
3940
import org.elasticsearch.search.internal.AliasFilter;
@@ -87,7 +88,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
8788
private final SearchResponse.Clusters clusters;
8889

8990
private final GroupShardsIterator<SearchShardIterator> toSkipShardsIts;
90-
private final GroupShardsIterator<SearchShardIterator> shardsIts;
91+
protected final GroupShardsIterator<SearchShardIterator> shardsIts;
9192
private final int expectedTotalOps;
9293
private final AtomicInteger totalOps = new AtomicInteger();
9394
private final int maxConcurrentRequestsPerNode;
@@ -381,6 +382,7 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard,
381382
logger.trace(new ParameterizedMessage("{}: Failed to execute [{}]", shard, request), e);
382383
}
383384
}
385+
onShardGroupFailure(shardIndex, e);
384386
onPhaseDone();
385387
} else {
386388
final ShardRouting nextShard = shardIt.nextOrNull();
@@ -389,7 +391,7 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard,
389391
logger.trace(() -> new ParameterizedMessage(
390392
"{}: Failed to execute [{}] lastShard [{}]",
391393
shard != null ? shard.shortSummary() : shardIt.shardId(), request, lastShard), e);
392-
if (!lastShard) {
394+
if (lastShard == false) {
393395
performPhaseOnShard(shardIndex, shardIt, nextShard);
394396
} else {
395397
// no more shards active, add a failure
@@ -400,10 +402,19 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard,
400402
shard != null ? shard.shortSummary() : shardIt.shardId(), request, lastShard), e);
401403
}
402404
}
405+
onShardGroupFailure(shardIndex, e);
403406
}
404407
}
405408
}
406409

410+
/**
411+
* Executed once for every {@link ShardId} that failed on all available shard routing.
412+
*
413+
* @param shardIndex the shard target that failed
414+
* @param exc the final failure reason
415+
*/
416+
protected void onShardGroupFailure(int shardIndex, Exception exc) {}
417+
407418
/**
408419
* Executed once for every failed shard level request. This method is invoked before the next replica is tried for the given
409420
* shard target.

server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.common.util.concurrent.AtomicArray;
2323
import org.elasticsearch.search.SearchPhaseResult;
2424
import org.elasticsearch.search.SearchShardTarget;
25+
import org.elasticsearch.search.builder.SearchSourceBuilder;
2526
import org.elasticsearch.search.dfs.AggregatedDfs;
2627
import org.elasticsearch.search.dfs.DfsSearchResult;
2728
import org.elasticsearch.search.query.QuerySearchRequest;
@@ -46,13 +47,15 @@ final class DfsQueryPhase extends SearchPhase {
4647
private final Function<ArraySearchPhaseResults<SearchPhaseResult>, SearchPhase> nextPhaseFactory;
4748
private final SearchPhaseContext context;
4849
private final SearchTransportService searchTransportService;
50+
private final SearchProgressListener progressListener;
4951

5052
DfsQueryPhase(AtomicArray<DfsSearchResult> dfsSearchResults,
5153
SearchPhaseController searchPhaseController,
5254
Function<ArraySearchPhaseResults<SearchPhaseResult>, SearchPhase> nextPhaseFactory,
5355
SearchPhaseContext context) {
5456
super("dfs_query");
55-
this.queryResult = searchPhaseController.newSearchPhaseResults(context.getRequest(), context.getNumShards());
57+
this.progressListener = context.getTask().getProgressListener();
58+
this.queryResult = searchPhaseController.newSearchPhaseResults(progressListener, context.getRequest(), context.getNumShards());
5659
this.searchPhaseController = searchPhaseController;
5760
this.dfsSearchResults = dfsSearchResults;
5861
this.nextPhaseFactory = nextPhaseFactory;
@@ -69,6 +72,8 @@ public void run() throws IOException {
6972
final CountedCollector<SearchPhaseResult> counter = new CountedCollector<>(queryResult::consumeResult,
7073
resultList.size(),
7174
() -> context.executeNextPhase(this, nextPhaseFactory.apply(queryResult)), context);
75+
final SearchSourceBuilder sourceBuilder = context.getRequest().source();
76+
progressListener.notifyListShards(progressListener.searchShards(resultList), sourceBuilder == null || sourceBuilder.size() != 0);
7277
for (final DfsSearchResult dfsResult : resultList) {
7378
final SearchShardTarget searchShardTarget = dfsResult.getSearchShardTarget();
7479
Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
@@ -92,6 +97,7 @@ public void onFailure(Exception exception) {
9297
try {
9398
context.getLogger().debug(() -> new ParameterizedMessage("[{}] Failed to execute query phase",
9499
querySearchRequest.id()), exception);
100+
progressListener.notifyQueryFailure(shardIndex, exception);
95101
counter.onFailure(shardIndex, searchShardTarget, exception);
96102
} finally {
97103
// the query might not have been executed at all (for example because thread pool rejected

server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ final class FetchSearchPhase extends SearchPhase {
4949
private final SearchPhaseContext context;
5050
private final Logger logger;
5151
private final SearchPhaseResults<SearchPhaseResult> resultConsumer;
52+
private final SearchProgressListener progressListener;
5253

5354
FetchSearchPhase(SearchPhaseResults<SearchPhaseResult> resultConsumer,
5455
SearchPhaseController searchPhaseController,
@@ -72,6 +73,7 @@ final class FetchSearchPhase extends SearchPhase {
7273
this.context = context;
7374
this.logger = context.getLogger();
7475
this.resultConsumer = resultConsumer;
76+
this.progressListener = context.getTask().getProgressListener();
7577
}
7678

7779
@Override
@@ -133,6 +135,7 @@ private void innerRun() throws IOException {
133135
// we do this as we go since it will free up resources and passing on the request on the
134136
// transport layer is cheap.
135137
releaseIrrelevantSearchContext(queryResult.queryResult());
138+
progressListener.notifyFetchResult(i);
136139
}
137140
// in any case we count down this result since we don't talk to this shard anymore
138141
counter.countDown();
@@ -165,6 +168,7 @@ private void executeFetch(final int shardIndex, final SearchShardTarget shardTar
165168
@Override
166169
public void innerOnResponse(FetchSearchResult result) {
167170
try {
171+
progressListener.notifyFetchResult(shardIndex);
168172
counter.onResult(result);
169173
} catch (Exception e) {
170174
context.onPhaseFailure(FetchSearchPhase.this, "", e);
@@ -175,6 +179,7 @@ public void innerOnResponse(FetchSearchResult result) {
175179
public void onFailure(Exception e) {
176180
try {
177181
logger.debug(() -> new ParameterizedMessage("[{}] Failed to execute fetch phase", fetchSearchRequest.id()), e);
182+
progressListener.notifyFetchFailure(shardIndex, e);
178183
counter.onFailure(shardIndex, shardTarget, e);
179184
} finally {
180185
// the search context might not be cleared on the node where the fetch was executed for example

server/src/main/java/org/elasticsearch/action/search/SearchActionListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.elasticsearch.search.SearchShardTarget;
2424

2525
/**
26-
* An base action listener that ensures shard target and shard index is set on all responses
26+
* A base action listener that ensures shard target and shard index is set on all responses
2727
* received by this listener.
2828
*/
2929
abstract class SearchActionListener<T extends SearchPhaseResult> implements ActionListener<T> {

server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -571,19 +571,23 @@ static final class QueryPhaseResultConsumer extends ArraySearchPhaseResults<Sear
571571
private final int bufferSize;
572572
private int index;
573573
private final SearchPhaseController controller;
574+
private final SearchProgressListener progressListener;
574575
private int numReducePhases = 0;
575576
private final TopDocsStats topDocsStats;
576577
private final boolean performFinalReduce;
577578

578579
/**
579580
* Creates a new {@link QueryPhaseResultConsumer}
581+
* @param progressListener a progress listener to be notified when a successful response is received
582+
* and when a partial or final reduce has completed.
580583
* @param controller a controller instance to reduce the query response objects
581584
* @param expectedResultSize the expected number of query results. Corresponds to the number of shards queried
582585
* @param bufferSize the size of the reduce buffer. if the buffer size is smaller than the number of expected results
583586
* the buffer is used to incrementally reduce aggregation results before all shards responded.
584587
*/
585-
private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedResultSize, int bufferSize,
586-
boolean hasTopDocs, boolean hasAggs, int trackTotalHitsUpTo, boolean performFinalReduce) {
588+
private QueryPhaseResultConsumer(SearchProgressListener progressListener, SearchPhaseController controller,
589+
int expectedResultSize, int bufferSize, boolean hasTopDocs, boolean hasAggs,
590+
int trackTotalHitsUpTo, boolean performFinalReduce) {
587591
super(expectedResultSize);
588592
if (expectedResultSize != 1 && bufferSize < 2) {
589593
throw new IllegalArgumentException("buffer size must be >= 2 if there is more than one expected result");
@@ -595,6 +599,7 @@ private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedR
595599
throw new IllegalArgumentException("either aggs or top docs must be present");
596600
}
597601
this.controller = controller;
602+
this.progressListener = progressListener;
598603
// no need to buffer anything if we have less expected results. in this case we don't consume any results ahead of time.
599604
this.aggsBuffer = new InternalAggregations[hasAggs ? bufferSize : 0];
600605
this.topDocsBuffer = new TopDocs[hasTopDocs ? bufferSize : 0];
@@ -610,6 +615,7 @@ public void consumeResult(SearchPhaseResult result) {
610615
super.consumeResult(result);
611616
QuerySearchResult queryResult = result.queryResult();
612617
consumeInternal(queryResult);
618+
progressListener.notifyQueryResult(queryResult.getShardIndex());
613619
}
614620

615621
private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
@@ -629,6 +635,10 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
629635
}
630636
numReducePhases++;
631637
index = 1;
638+
if (hasAggs) {
639+
progressListener.notifyPartialReduce(progressListener.searchShards(results.asList()),
640+
topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases);
641+
}
632642
}
633643
final int i = index++;
634644
if (hasAggs) {
@@ -652,8 +662,11 @@ private synchronized List<TopDocs> getRemainingTopDocs() {
652662

653663
@Override
654664
public ReducedQueryPhase reduce() {
655-
return controller.reducedQueryPhase(results.asList(), getRemainingAggs(), getRemainingTopDocs(), topDocsStats,
656-
numReducePhases, false, performFinalReduce);
665+
ReducedQueryPhase reducePhase = controller.reducedQueryPhase(results.asList(),
666+
getRemainingAggs(), getRemainingTopDocs(), topDocsStats, numReducePhases, false, performFinalReduce);
667+
progressListener.notifyReduce(progressListener.searchShards(results.asList()),
668+
reducePhase.totalHits, reducePhase.aggregations);
669+
return reducePhase;
657670
}
658671

659672
/**
@@ -678,7 +691,9 @@ private int resolveTrackTotalHits(SearchRequest request) {
678691
/**
679692
* Returns a new ArraySearchPhaseResults instance. This might return an instance that reduces search responses incrementally.
680693
*/
681-
ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResults(SearchRequest request, int numShards) {
694+
ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResults(SearchProgressListener listener,
695+
SearchRequest request,
696+
int numShards) {
682697
SearchSourceBuilder source = request.source();
683698
boolean isScrollRequest = request.scroll() != null;
684699
final boolean hasAggs = source != null && source.aggregations() != null;
@@ -688,14 +703,24 @@ ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResults(SearchRequest r
688703
// no incremental reduce if scroll is used - we only hit a single shard or sometimes more...
689704
if (request.getBatchedReduceSize() < numShards) {
690705
// only use this if there are aggs and if there are more shards than we should reduce at once
691-
return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs,
706+
return new QueryPhaseResultConsumer(listener, this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs,
692707
trackTotalHitsUpTo, request.isFinalReduce());
693708
}
694709
}
695710
return new ArraySearchPhaseResults<SearchPhaseResult>(numShards) {
711+
@Override
712+
void consumeResult(SearchPhaseResult result) {
713+
super.consumeResult(result);
714+
listener.notifyQueryResult(result.queryResult().getShardIndex());
715+
}
716+
696717
@Override
697718
ReducedQueryPhase reduce() {
698-
return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHitsUpTo, request.isFinalReduce());
719+
List<SearchPhaseResult> resultList = results.asList();
720+
final ReducedQueryPhase reducePhase =
721+
reducedQueryPhase(resultList, isScrollRequest, trackTotalHitsUpTo, request.isFinalReduce());
722+
listener.notifyReduce(listener.searchShards(resultList), reducePhase.totalHits, reducePhase.aggregations);
723+
return reducePhase;
699724
}
700725
};
701726
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action.search;
21+
22+
import org.elasticsearch.action.ActionListener;
23+
24+
/**
25+
* An {@link ActionListener} for search requests that allows to track progress of the {@link SearchAction}.
26+
* See {@link SearchProgressListener}.
27+
*/
28+
public abstract class SearchProgressActionListener extends SearchProgressListener implements ActionListener<SearchResponse> {
29+
}

0 commit comments

Comments
 (0)