Skip to content

Commit 3ba3f7e

Browse files
navneet1vmsfrohandrross
authored andcommitted
Adding the SearchPhaseResultsProcessor interface in Search Pipeline (opensearch-project#7283)
* Initial code for adding the SearchPhaseInjectorProcessor interface in Search Pipeline Signed-off-by: Navneet Verma <navneev@amazon.com> * Pass PipelinedRequest to SearchAsyncActions We should resolve a search pipeline once at the start of a search request and then propagate that pipeline through the async actions. When completing a search phase, we will then use that pipeline to inject behavior (if applicable). Signed-off-by: Michael Froh <froh@amazon.com> * Renamed SearchPhaseInjectorProcessor to SearchPhaseResultsProcessor and fixed the comments Signed-off-by: Navneet Verma <navneev@amazon.com> * Make PipelinedSearchRequest extend SearchRequest Rather than wrapping a SearchRequest in a PipelinedSearchRequest, changes are less intrusive if we say that a PipelinedSearchRequest "is a" SearchRequest. Signed-off-by: Michael Froh <froh@amazon.com> * Revert code change from merge conflict Signed-off-by: Michael Froh <froh@amazon.com> * Updated the changelog with more appropiate wording for the change. Signed-off-by: Navneet Verma <navneev@amazon.com> * Fixed Typos in the code Signed-off-by: Navneet Verma <navneev@amazon.com> * Fixing comments relating to return of SearchPhaseResults from processor Signed-off-by: Navneet Verma <navneev@amazon.com> * Moved SearchPhaseName enum in separate class and fixed comments. Signed-off-by: Navneet Verma <navneev@amazon.com> * Resolve remaining merge conflict Signed-off-by: Michael Froh <froh@amazon.com> --------- Signed-off-by: Navneet Verma <navneev@amazon.com> Signed-off-by: Michael Froh <froh@amazon.com> Co-authored-by: Michael Froh <froh@amazon.com> Co-authored-by: Andrew Ross <andrross@amazon.com> Signed-off-by: Shivansh Arora <hishiv@amazon.com>
1 parent f893136 commit 3ba3f7e

22 files changed

+402
-46
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
8080

8181
## [Unreleased 2.x]
8282
### Added
83+
- [SearchPipeline] Add new search pipeline processor type, SearchPhaseResultsProcessor, that can modify the result of one search phase before starting the next phase.([#7283](https://github.com/opensearch-project/OpenSearch/pull/7283))
8384
- Add task cancellation monitoring service ([#7642](https://github.com/opensearch-project/OpenSearch/pull/7642))
8485
- Add TokenManager Interface ([#7452](https://github.com/opensearch-project/OpenSearch/pull/7452))
8586
- Add Remote store as a segment replication source ([#7653](https://github.com/opensearch-project/OpenSearch/pull/7653))

modules/search-pipeline-common/src/yamlRestTest/resources/rest-api-spec/test/search_pipeline/50_script_processor.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ teardown:
3939
{
4040
"script" : {
4141
"lang" : "painless",
42-
"source" : "ctx._source['size'] += 10; ctx._source['from'] -= 1; ctx._source['explain'] = !ctx._source['explain']; ctx._source['version'] = !ctx._source['version']; ctx._source['seq_no_primary_term'] = !ctx._source['seq_no_primary_term']; ctx._source['track_scores'] = !ctx._source['track_scores']; ctx._source['track_total_hits'] = 1; ctx._source['min_score'] -= 0.9; ctx._source['terminate_after'] += 2; ctx._source['profile'] = !ctx._source['profile'];"
42+
"source" : "ctx._source['size'] += 10; ctx._source['from'] = ctx._source['from'] <= 0 ? ctx._source['from'] : ctx._source['from'] - 1 ; ctx._source['explain'] = !ctx._source['explain']; ctx._source['version'] = !ctx._source['version']; ctx._source['seq_no_primary_term'] = !ctx._source['seq_no_primary_term']; ctx._source['track_scores'] = !ctx._source['track_scores']; ctx._source['track_total_hits'] = 1; ctx._source['min_score'] -= 0.9; ctx._source['terminate_after'] += 2; ctx._source['profile'] = !ctx._source['profile'];"
4343
}
4444
}
4545
]

server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.opensearch.search.internal.InternalSearchResponse;
5858
import org.opensearch.search.internal.SearchContext;
5959
import org.opensearch.search.internal.ShardSearchRequest;
60+
import org.opensearch.search.pipeline.PipelinedRequest;
6061
import org.opensearch.transport.Transport;
6162

6263
import java.util.ArrayDeque;
@@ -696,7 +697,11 @@ private void raisePhaseFailure(SearchPhaseExecutionException exception) {
696697
* @see #onShardResult(SearchPhaseResult, SearchShardIterator)
697698
*/
698699
final void onPhaseDone() { // as a tribute to @kimchy aka. finishHim()
699-
executeNextPhase(this, getNextPhase(results, this));
700+
final SearchPhase nextPhase = getNextPhase(results, this);
701+
if (request instanceof PipelinedRequest && nextPhase != null) {
702+
((PipelinedRequest) request).transformSearchPhaseResults(results, this, this.getName(), nextPhase.getName());
703+
}
704+
executeNextPhase(this, nextPhase);
700705
}
701706

702707
@Override

server/src/main/java/org/opensearch/action/search/ArraySearchPhaseResults.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ boolean hasResult(int shardIndex) {
6666
}
6767

6868
@Override
69-
AtomicArray<Result> getAtomicArray() {
69+
public AtomicArray<Result> getAtomicArray() {
7070
return results;
7171
}
7272
}

server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
9494
) {
9595
// We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests
9696
super(
97-
"can_match",
97+
SearchPhaseName.CAN_MATCH.getName(),
9898
logger,
9999
searchTransportService,
100100
nodeIdToConnection,

server/src/main/java/org/opensearch/action/search/DfsQueryPhase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ final class DfsQueryPhase extends SearchPhase {
6969
Function<ArraySearchPhaseResults<SearchPhaseResult>, SearchPhase> nextPhaseFactory,
7070
SearchPhaseContext context
7171
) {
72-
super("dfs_query");
72+
super(SearchPhaseName.DFS_QUERY.getName());
7373
this.progressListener = context.getTask().getProgressListener();
7474
this.queryResult = queryResult;
7575
this.searchResults = searchResults;

server/src/main/java/org/opensearch/action/search/ExpandSearchPhase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ final class ExpandSearchPhase extends SearchPhase {
6262
private final AtomicArray<SearchPhaseResult> queryResults;
6363

6464
ExpandSearchPhase(SearchPhaseContext context, InternalSearchResponse searchResponse, AtomicArray<SearchPhaseResult> queryResults) {
65-
super("expand");
65+
super(SearchPhaseName.EXPAND.getName());
6666
this.context = context;
6767
this.searchResponse = searchResponse;
6868
this.queryResults = queryResults;

server/src/main/java/org/opensearch/action/search/FetchSearchPhase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ final class FetchSearchPhase extends SearchPhase {
9292
SearchPhaseContext context,
9393
BiFunction<InternalSearchResponse, AtomicArray<SearchPhaseResult>, SearchPhase> nextPhaseFactory
9494
) {
95-
super("fetch");
95+
super(SearchPhaseName.FETCH.getName());
9696
if (context.getNumShards() != resultConsumer.getNumShards()) {
9797
throw new IllegalStateException(
9898
"number of shards must match the length of the query results but doesn't:"

server/src/main/java/org/opensearch/action/search/SearchPhase.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.opensearch.common.CheckedRunnable;
3535

3636
import java.io.IOException;
37+
import java.util.Locale;
3738
import java.util.Objects;
3839

3940
/**
@@ -54,4 +55,13 @@ protected SearchPhase(String name) {
5455
public String getName() {
5556
return name;
5657
}
58+
59+
/**
60+
* Returns the SearchPhase name as {@link SearchPhaseName}. Exception will come if SearchPhase name is not defined
61+
* in {@link SearchPhaseName}
62+
* @return {@link SearchPhaseName}
63+
*/
64+
public SearchPhaseName getSearchPhaseName() {
65+
return SearchPhaseName.valueOf(name.toUpperCase(Locale.ROOT));
66+
}
5767
}

server/src/main/java/org/opensearch/action/search/SearchPhaseContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
*
5151
* @opensearch.internal
5252
*/
53-
interface SearchPhaseContext extends Executor {
53+
public interface SearchPhaseContext extends Executor {
5454
// TODO maybe we can make this concrete later - for now we just implement this in the base class for all initial phases
5555

5656
/**

0 commit comments

Comments
 (0)