Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

- [Semantic Field] Support the sparse two phase processor for the semantic field.
- [Stats] Add stats for agentic query and agentic query translator processor.
- [Performance Improvement] Introduce QueryCollectContextSpec in Hybrid Query to improve search performance.

### Bug Fixes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.opensearch.neuralsearch.query.NeuralSparseQueryBuilder;
import org.opensearch.neuralsearch.query.NeuralKNNQueryBuilder;
import org.opensearch.neuralsearch.query.AgenticSearchQueryBuilder;
import org.opensearch.neuralsearch.search.collector.HybridQueryCollectorContextSpecFactory;
import org.opensearch.neuralsearch.search.query.HybridQueryPhaseSearcher;
import org.opensearch.neuralsearch.rest.RestNeuralSparseClearCacheHandler;
import org.opensearch.neuralsearch.rest.RestNeuralSparseWarmupHandler;
import org.opensearch.neuralsearch.settings.NeuralSearchSettingsAccessor;
Expand All @@ -40,6 +42,8 @@
import org.opensearch.neuralsearch.mappingtransformer.SemanticMappingTransformer;
import org.opensearch.neuralsearch.processor.factory.SemanticFieldProcessorFactory;
import org.opensearch.plugins.MapperPlugin;
import org.opensearch.search.query.QueryCollectorContextSpecFactory;
import org.opensearch.search.query.QueryPhaseSearcher;
import org.opensearch.transport.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -83,7 +87,6 @@
import org.opensearch.neuralsearch.processor.rerank.RerankProcessor;
import org.opensearch.neuralsearch.query.ext.RerankSearchExtBuilder;
import org.opensearch.neuralsearch.rest.RestNeuralStatsAction;
import org.opensearch.neuralsearch.search.query.HybridQueryPhaseSearcher;
import org.opensearch.neuralsearch.transport.NeuralStatsAction;
import org.opensearch.neuralsearch.transport.NeuralStatsTransportAction;
import org.opensearch.neuralsearch.transport.NeuralSparseClearCacheAction;
Expand All @@ -106,7 +109,6 @@
import org.opensearch.search.pipeline.SearchPhaseResultsProcessor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
import org.opensearch.search.pipeline.SearchResponseProcessor;
import org.opensearch.search.query.QueryPhaseSearcher;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;
Expand Down Expand Up @@ -348,4 +350,9 @@ public Map<String, Processor.Factory> getSystemIngestProcessors(Processor.Parame
)
);
}

@Override
public List<QueryCollectorContextSpecFactory> getCollectorContextSpecFactories() {
return List.of(new HybridQueryCollectorContextSpecFactory());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.neuralsearch.search.collector;

import lombok.extern.log4j.Log4j2;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.Query;
import org.opensearch.common.lucene.search.TopDocsAndMaxScore;
import org.opensearch.neuralsearch.search.query.HybridCollectorManager;
import org.opensearch.neuralsearch.search.query.HybridCollectorResultsUtilParams;
import org.opensearch.neuralsearch.search.query.util.HybridSearchCollectorResultUtil;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.query.QueryCollectorContextSpec;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.search.query.ReduceableSearchResult;

import java.io.IOException;

import static org.opensearch.search.profile.query.CollectorResult.REASON_SEARCH_TOP_HITS;

/**
* Create hybrid collector manager and injects the hybrid collector into the query phase to allow custom scoring and top doc collection
*/
@Log4j2
public class HybridQueryCollectorContextSpec implements QueryCollectorContextSpec {
private final HybridCollectorManager collectorManager;
private final HybridSearchCollector collector;
private final SearchContext searchContext;

public HybridQueryCollectorContextSpec(final SearchContext searchContext, final Query query) {
this.searchContext = searchContext;
this.collectorManager = (HybridCollectorManager) HybridCollectorManager.createHybridCollectorManager(searchContext, query);
this.collector = (HybridSearchCollector) collectorManager.newCollector();
}

@Override
public String getContextName() {
return REASON_SEARCH_TOP_HITS;
}

@Override
public Collector create(Collector in) throws IOException {
return collector;
}

@Override
public CollectorManager<?, ReduceableSearchResult> createManager(CollectorManager<?, ReduceableSearchResult> in) {
return collectorManager;
}

@Override
public void postProcess(QuerySearchResult result) throws IOException {
HybridSearchCollectorResultUtil hybridSearchCollectorResultUtil = new HybridSearchCollectorResultUtil(
new HybridCollectorResultsUtilParams.Builder().searchContext(searchContext).build(),
collector
);
TopDocsAndMaxScore topDocsAndMaxScore = hybridSearchCollectorResultUtil.getTopDocsAndMaxScore();
hybridSearchCollectorResultUtil.reduceCollectorResults(result, topDocsAndMaxScore);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.neuralsearch.search.collector;

import lombok.extern.log4j.Log4j2;
import org.apache.lucene.search.Query;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.query.QueryCollectorArguments;
import org.opensearch.search.query.QueryCollectorContextSpec;
import org.opensearch.search.query.QueryCollectorContextSpecFactory;

import java.util.Optional;

import static org.opensearch.neuralsearch.util.HybridQueryUtil.isHybridQuery;

/**
* Factory class for HybridQueryCollectorContextSpec. In case of hybrid query, it will create the spec which will retrieved in the QueryPhase.
*/
@Log4j2
public class HybridQueryCollectorContextSpecFactory implements QueryCollectorContextSpecFactory {

@Override
public Optional<QueryCollectorContextSpec> createQueryCollectorContextSpec(
SearchContext searchContext,
Query query,
QueryCollectorArguments queryCollectorArguments
) {
if (isHybridQuery(query, searchContext)) {
return Optional.of(new HybridQueryCollectorContextSpec(searchContext, query));
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,74 +5,40 @@
package org.opensearch.neuralsearch.search.query;

import lombok.AllArgsConstructor;
import org.apache.lucene.search.CollectorManager;
import org.opensearch.search.aggregations.AggregationInitializationException;
import lombok.extern.log4j.Log4j2;
import org.opensearch.search.aggregations.AggregationProcessor;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.query.QueryPhaseExecutionException;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.search.query.ReduceableSearchResult;

import java.io.IOException;
import java.util.List;

import static org.opensearch.neuralsearch.util.HybridQueryUtil.isHybridQuery;

/**
* Defines logic for pre- and post-phases of document scores collection. Responsible for registering custom
* collector manager for hybris query (pre phase) and reducing results (post phase)
* Defines logic for pre- and post-phases of hybrid query aggregation processor.
*/
@AllArgsConstructor
@Log4j2
public class HybridAggregationProcessor implements AggregationProcessor {

private final AggregationProcessor delegateAggsProcessor;

@Override
public void preProcess(SearchContext context) {
// Simply delegate the call
delegateAggsProcessor.preProcess(context);

if (isHybridQuery(context.query(), context)) {
// adding collector manager for hybrid query
CollectorManager collectorManager;
try {
collectorManager = HybridCollectorManager.createHybridCollectorManager(context);
} catch (IOException exception) {
throw new AggregationInitializationException("could not initialize hybrid aggregation processor", exception);
}
context.queryCollectorManagers().put(HybridCollectorManager.class, collectorManager);
}
}

@Override
public void postProcess(SearchContext context) {
if (isHybridQuery(context.query(), context)) {
// for case when concurrent search is not enabled (default as of 2.12 release) reduce for collector
// managers is not called
// (https://github.com/opensearch-project/OpenSearch/blob/2.12/server/src/main/java/org/opensearch/search/query/QueryPhase.java#L333-L373)
// and we have to call it manually. This is required as we format final
// result of hybrid query in {@link HybridTopScoreCollector#reduce}
// when concurrent search is enabled then reduce method is called as part of the search {@see
// ConcurrentQueryPhaseSearcher#searchWithCollectorManager}
// corresponding call in Lucene
// https://github.com/apache/lucene/blob/branch_9_10/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java#L700
if (!context.shouldUseConcurrentSearch()) {
reduceCollectorResults(context);
}
// In case of Hybrid Query single shard, the normalization process would run after the fetch phase execution.
// The fetch phase will run right after the Query Phase and therefore need the right number size of docIds to be loaded.
// As we add delimiter in the topdocs to segregate multiple query results,
// therefore the right number of size will be calculated by scoreDocs length present in the topDocs.
updateQueryResult(context.queryResult(), context);
}

delegateAggsProcessor.postProcess(context);
}

private void reduceCollectorResults(SearchContext context) {
CollectorManager<?, ReduceableSearchResult> collectorManager = context.queryCollectorManagers().get(HybridCollectorManager.class);
try {
collectorManager.reduce(List.of()).reduce(context.queryResult());
} catch (IOException e) {
throw new QueryPhaseExecutionException(context.shardTarget(), "failed to execute hybrid query aggregation processor", e);
}
}

private void updateQueryResult(final QuerySearchResult queryResult, final SearchContext searchContext) {
boolean isSingleShard = searchContext.numberOfShards() == 1;
if (isSingleShard) {
Expand Down
Loading
Loading