-
Notifications
You must be signed in to change notification settings - Fork 65
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Backport 2.x] Added Score Normalization and Combination feature, man…
…ual backport (#263) * Added Score Normalization and Combination feature (#241) Signed-off-by: Martin Gaievski <gaievski@amazon.com>
- Loading branch information
1 parent
adf3925
commit 185050a
Showing
58 changed files
with
8,556 additions
and
25 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
122 changes: 122 additions & 0 deletions
122
src/main/java/org/opensearch/neuralsearch/processor/NormalizationProcessor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.neuralsearch.processor; | ||
|
||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.Optional; | ||
import java.util.stream.Collectors; | ||
|
||
import lombok.AllArgsConstructor; | ||
import lombok.extern.log4j.Log4j2; | ||
|
||
import org.opensearch.action.search.QueryPhaseResultConsumer; | ||
import org.opensearch.action.search.SearchPhaseContext; | ||
import org.opensearch.action.search.SearchPhaseName; | ||
import org.opensearch.action.search.SearchPhaseResults; | ||
import org.opensearch.neuralsearch.processor.combination.ScoreCombinationTechnique; | ||
import org.opensearch.neuralsearch.processor.normalization.ScoreNormalizationTechnique; | ||
import org.opensearch.neuralsearch.search.CompoundTopDocs; | ||
import org.opensearch.search.SearchPhaseResult; | ||
import org.opensearch.search.internal.SearchContext; | ||
import org.opensearch.search.pipeline.SearchPhaseResultsProcessor; | ||
import org.opensearch.search.query.QuerySearchResult; | ||
|
||
/** | ||
* Processor for score normalization and combination on post query search results. Updates query results with | ||
* normalized and combined scores for next phase (typically it's FETCH) | ||
*/ | ||
@Log4j2 | ||
@AllArgsConstructor | ||
public class NormalizationProcessor implements SearchPhaseResultsProcessor { | ||
public static final String TYPE = "normalization-processor"; | ||
|
||
private final String tag; | ||
private final String description; | ||
private final ScoreNormalizationTechnique normalizationTechnique; | ||
private final ScoreCombinationTechnique combinationTechnique; | ||
private final NormalizationProcessorWorkflow normalizationWorkflow; | ||
|
||
/** | ||
* Method abstracts functional aspect of score normalization and score combination. Exact methods for each processing stage | ||
* are set as part of class constructor | ||
* @param searchPhaseResult {@link SearchPhaseResults} DTO that has query search results. Results will be mutated as part of this method execution | ||
* @param searchPhaseContext {@link SearchContext} | ||
*/ | ||
@Override | ||
public <Result extends SearchPhaseResult> void process( | ||
final SearchPhaseResults<Result> searchPhaseResult, | ||
final SearchPhaseContext searchPhaseContext | ||
) { | ||
if (shouldSkipProcessor(searchPhaseResult)) { | ||
log.debug("Query results are not compatible with normalization processor"); | ||
return; | ||
} | ||
List<QuerySearchResult> querySearchResults = getQueryPhaseSearchResults(searchPhaseResult); | ||
normalizationWorkflow.execute(querySearchResults, normalizationTechnique, combinationTechnique); | ||
} | ||
|
||
@Override | ||
public SearchPhaseName getBeforePhase() { | ||
return SearchPhaseName.QUERY; | ||
} | ||
|
||
@Override | ||
public SearchPhaseName getAfterPhase() { | ||
return SearchPhaseName.FETCH; | ||
} | ||
|
||
@Override | ||
public String getType() { | ||
return TYPE; | ||
} | ||
|
||
@Override | ||
public String getTag() { | ||
return tag; | ||
} | ||
|
||
@Override | ||
public String getDescription() { | ||
return description; | ||
} | ||
|
||
@Override | ||
public boolean isIgnoreFailure() { | ||
return false; | ||
} | ||
|
||
private <Result extends SearchPhaseResult> boolean shouldSkipProcessor(SearchPhaseResults<Result> searchPhaseResult) { | ||
if (Objects.isNull(searchPhaseResult) || !(searchPhaseResult instanceof QueryPhaseResultConsumer)) { | ||
return true; | ||
} | ||
|
||
QueryPhaseResultConsumer queryPhaseResultConsumer = (QueryPhaseResultConsumer) searchPhaseResult; | ||
Optional<SearchPhaseResult> optionalSearchPhaseResult = queryPhaseResultConsumer.getAtomicArray() | ||
.asList() | ||
.stream() | ||
.filter(Objects::nonNull) | ||
.findFirst(); | ||
return isNotHybridQuery(optionalSearchPhaseResult); | ||
} | ||
|
||
private boolean isNotHybridQuery(final Optional<SearchPhaseResult> optionalSearchPhaseResult) { | ||
return optionalSearchPhaseResult.isEmpty() | ||
|| Objects.isNull(optionalSearchPhaseResult.get().queryResult()) | ||
|| Objects.isNull(optionalSearchPhaseResult.get().queryResult().topDocs()) | ||
|| !(optionalSearchPhaseResult.get().queryResult().topDocs().topDocs instanceof CompoundTopDocs); | ||
} | ||
|
||
private <Result extends SearchPhaseResult> List<QuerySearchResult> getQueryPhaseSearchResults( | ||
final SearchPhaseResults<Result> results | ||
) { | ||
return results.getAtomicArray() | ||
.asList() | ||
.stream() | ||
.map(result -> result == null ? null : result.queryResult()) | ||
.collect(Collectors.toList()); | ||
} | ||
} |
88 changes: 88 additions & 0 deletions
88
src/main/java/org/opensearch/neuralsearch/processor/NormalizationProcessorWorkflow.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.neuralsearch.processor; | ||
|
||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.stream.Collectors; | ||
|
||
import lombok.AllArgsConstructor; | ||
import lombok.extern.log4j.Log4j2; | ||
|
||
import org.opensearch.common.lucene.search.TopDocsAndMaxScore; | ||
import org.opensearch.neuralsearch.processor.combination.ScoreCombinationTechnique; | ||
import org.opensearch.neuralsearch.processor.combination.ScoreCombiner; | ||
import org.opensearch.neuralsearch.processor.normalization.ScoreNormalizationTechnique; | ||
import org.opensearch.neuralsearch.processor.normalization.ScoreNormalizer; | ||
import org.opensearch.neuralsearch.search.CompoundTopDocs; | ||
import org.opensearch.search.query.QuerySearchResult; | ||
|
||
/** | ||
* Class abstracts steps required for score normalization and combination, this includes pre-processing of incoming data | ||
* and post-processing of final results | ||
*/ | ||
@AllArgsConstructor | ||
@Log4j2 | ||
public class NormalizationProcessorWorkflow { | ||
|
||
private final ScoreNormalizer scoreNormalizer; | ||
private final ScoreCombiner scoreCombiner; | ||
|
||
/** | ||
* Start execution of this workflow | ||
* @param querySearchResults input data with QuerySearchResult from multiple shards | ||
* @param normalizationTechnique technique for score normalization | ||
* @param combinationTechnique technique for score combination | ||
*/ | ||
public void execute( | ||
final List<QuerySearchResult> querySearchResults, | ||
final ScoreNormalizationTechnique normalizationTechnique, | ||
final ScoreCombinationTechnique combinationTechnique | ||
) { | ||
// pre-process data | ||
log.debug("Pre-process query results"); | ||
List<CompoundTopDocs> queryTopDocs = getQueryTopDocs(querySearchResults); | ||
|
||
// normalize | ||
log.debug("Do score normalization"); | ||
scoreNormalizer.normalizeScores(queryTopDocs, normalizationTechnique); | ||
|
||
// combine | ||
log.debug("Do score combination"); | ||
scoreCombiner.combineScores(queryTopDocs, combinationTechnique); | ||
|
||
// post-process data | ||
log.debug("Post-process query results after score normalization and combination"); | ||
updateOriginalQueryResults(querySearchResults, queryTopDocs); | ||
} | ||
|
||
/** | ||
* Getting list of CompoundTopDocs from list of QuerySearchResult. Each CompoundTopDocs is for individual shard | ||
* @param querySearchResults collection of QuerySearchResult for all shards | ||
* @return collection of CompoundTopDocs, one object for each shard | ||
*/ | ||
private List<CompoundTopDocs> getQueryTopDocs(final List<QuerySearchResult> querySearchResults) { | ||
List<CompoundTopDocs> queryTopDocs = querySearchResults.stream() | ||
.filter(searchResult -> Objects.nonNull(searchResult.topDocs())) | ||
.filter(searchResult -> searchResult.topDocs().topDocs instanceof CompoundTopDocs) | ||
.map(searchResult -> (CompoundTopDocs) searchResult.topDocs().topDocs) | ||
.collect(Collectors.toList()); | ||
return queryTopDocs; | ||
} | ||
|
||
private void updateOriginalQueryResults(final List<QuerySearchResult> querySearchResults, final List<CompoundTopDocs> queryTopDocs) { | ||
for (int i = 0; i < querySearchResults.size(); i++) { | ||
QuerySearchResult querySearchResult = querySearchResults.get(i); | ||
if (!(querySearchResult.topDocs().topDocs instanceof CompoundTopDocs) || Objects.isNull(queryTopDocs.get(i))) { | ||
continue; | ||
} | ||
CompoundTopDocs updatedTopDocs = queryTopDocs.get(i); | ||
float maxScore = updatedTopDocs.totalHits.value > 0 ? updatedTopDocs.scoreDocs[0].score : 0.0f; | ||
TopDocsAndMaxScore updatedTopDocsAndMaxScore = new TopDocsAndMaxScore(updatedTopDocs, maxScore); | ||
querySearchResult.topDocs(updatedTopDocsAndMaxScore, null); | ||
} | ||
} | ||
} |
Oops, something went wrong.