Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Aggregations and Hybrid query #604

Closed
martin-gaievski opened this issue Feb 14, 2024 · 3 comments
Closed

[RFC] Aggregations and Hybrid query #604

martin-gaievski opened this issue Feb 14, 2024 · 3 comments
Assignees

Comments

@martin-gaievski
Copy link
Member

martin-gaievski commented Feb 14, 2024

Introduction

This document describes details of design for Aggregations in Hybrid Query. This feature has been requested by customer through GitHub issues #509 and #422.

Background

Initial implementation of Hybrid Query has minimum functionality and by design does not work with any type of existing aggregations. Queries with both “aggs” and hybrid were not blocked but results were not guaranteed.

Aggregations were planned for be added in future releases using existing architecture of Hybrid query using custom implementation of QueryPhaseSearcher. With time code pieces in core OpenSearch related to QueryPhaseSearcher were evolved and concurrent search became GA in 2.12. We need to take into account those changes when doing this design.

In context of this design we understand aggregation as a alternative approach for getting data stats. Aggregations can be used as a standalone request or together with search queries. In first case data is collected for all documents in the index, similar to case if “match_all” query is passed. When it’s bundled with the search query then results are based on the documents returned by the query. There is a special mixed case, for aggregation type global when the query results are collected but aggregation is done for all data set.

Functional Requirements

  1. User should be able to execute hybrid query along with existing aggregations using existing syntax. List of exact aggregations that will work with hybrid query will be identified.
  2. Existing functionality of Hybrid query (like sub-queries, score normalization etc.) should remain unchanged.

Non Functional Requirements

  1. Hybrid query should work when concurrent segment search feature from OpenSearch core is enabled.
  2. Latency added by aggregations should be within 10% of the latency that same aggregations are adding to core queries.
  3. Utilize existing logic/code for executing and collecting aggregation results, avoid copying logic for specific aggregations.

Document Scope

In this document we propose a solution for the questions below:

  1. How aggregation processing will be started and final results collected.
  2. How we’re keeping existing functionality of Hybrid query.
  3. Is approach extensible for adding more functionality in future.

Out of Document Scope

  1. Details of how each individual aggregation type works. Based on one of non-functional requirements we’re not changing or adding logic for computing aggregations.
  2. Any non-standard aggregation, meaning not mentioned in the list of aggregations supported by OpenSearch out of the box. They may or may not work, we’re not focusing on checking it in scope of this design.

Solution Overview

Solution is based on calling collectors that are created by code logic when executing Query phase. Different options for solution are around of how to call those collectors and compile final query results.

Certain changes in current behavior of Hybrid search query are required regardless of the option:

  • relax the limit of max hits per shard, in current implementation it’s max of size among all sub-queries. It should be max of unique docs across all sub-query results.

For example, for 1 shard we have following results:

sub_query_1 results: 
{ doc1, doc2 },
sub_query_2 results:
{ doc3, doc1, doc4}

with current logic this will be reduced to:

{ doc1, doc3, doc2 }

with proposed change it will be:

{ doc1, doc3, doc2, doc4 }

note that final result list has size of max of all sub queries (3) and one of the docs (doc4) is dropped from the final collection.

Such change is required to keep results of aggregations and hybrid query in sync. For non-global aggregation results are based on sub-set of data that is returned by the query. If some of the query results are dropped then two resulting collections will be inconsistent.

Below is continuation of the example with 2 sub-queries.
If our docs look like:

doc1 {"doc_keyword": "red"}
doc2 {"doc_keyword": "yellow"}
doc3 {"doc_keyword": "yellow"}
doc4 {"doc_keyword": "green"}

and “terms” aggregation is part of the query:

    "aggs": {
        "doc_colors": {
            "terms": {
                "field": "doc_keyword",
                "size":5
            }
        }
    }

then with current logic the final result will look like:

"hits": [
   { doc1, doc3, doc2 } 
],
"aggregations": {
    "doc_colors": {
        "buckets": [
            {
                "key": "red",
                "doc_count": 1
            },
            {
                "key": "yellow",
                "doc_count": 2
            },
            {
                "key": "green",
                "doc_count": 1
            }
        ]
    }
}

after the change the final result will look like:

"hits": [
   { doc1, doc3, doc2, doc4 } 
],
"aggregations": {
    "doc_colors": {
        "buckets": [
            {
                "key": "red",
                "doc_count": 1
            },
            {
                "key": "yellow",
                "doc_count": 2
            },
            {
                "key": "green",
                "doc_count": 1
            }
        ]
    }
}

Risks / Known limitations

  • Must be compatible with concurrent segment search in core OpenSearch

  • Depending on the option preferred change may not be fully backward compatible with existing hybrid query.
    At the moment we're planning to relax the limit of max hits per shard.

  • Some aggregations may not be supported depending on the chosen option. If fix is complex and aggregation is question is not widely used it can be blocked, query that contains such aggregation type will throw an exception.

Option 1:

    * Diversified Sampler, bucket type aggregation, https://opensearch.org/docs/latest/aggregations/bucket/diversified-sampler/
    * Sampler, bucket type aggregation, https://opensearch.org/docs/latest/aggregations/bucket/sampler/
    * Geohex, bucket type https://opensearch.org/docs/latest/aggregations/bucket/geohex-grid/

This is the list of aggregations we are checking to verify design options solution

Metric aggregations

  • Average
  • Cardinality
  • Extended stats
  • Geobounds points
  • Geobounds shape
  •   Matrix stats
  •   Maximum
  •   Minimum
  •   Percentile ranks
  •   Percentile
  •   Scripted metric
  •   Stats
  •   Sum
  •   Top hits
  •   Value count

 
Bucket aggregations

  • Adjacency matrix
  • Date histogram
  • Date range
  • Diversified sampler
  • Filter
  • Filters
  • Geodistance
  • Geohash grid, points
  • Geohash grid, shapes
  • Geohex grid
  • Geotile grid
  • Global
  • Histogram
  • IP range
  • Missing
  • Multi-terms
  • Nested
  • Range
  • Reverse nested
  • Sampler
  • Significant terms
  • Significant text
  • Terms
     
    Pipeline sibling aggregations
  • min_bucket
  • max_bucket
  • sum_bucket
  • avg_bucket
  • stats_bucket
  • extended_stats_bucket
     
    Pipeline, parent
  • bucket_script
  • bucket_selector
  • bucket_sort
  • cumulative_sum
  • derivative
  • moving_avg
  • serial_diff

Solution Details

Option 1: New Collector Manager

Pros:

Cons:

  • conflicting with concurrent search for cases when aggregation doesn't support that feature at the low level. that includes any 3rd party aggregation (e.g added with plugin) that doesn't add explicitly support for concurrent search
  • complex implementation, refactoring in hybrid search is needed
  • adding support for post filters is not feasible

We can create a new custom CollectorManager and pass it to search method that takes CollectorManager. That should replace the call to a similar search method. New method will:

  1. take CollectorManager as an argument, not Collector
  2. return Reducible result object instead of void

Advantage of such approach is that such collection manager can be stacked with other manager objects into multi collector manager object. And aggregations do have their own collector manager that when executed collects all the data needed to form final aggregation results.

Below is diagram for classes that will be affected if we go with this option.

Normalization and Score combination-Class Diagram QueryPhaseSearcher - Aggs drawio (1)

High level flow will look like below

Normalization and Score combination-Sequence diagram Collector Manager Option drawio (1)

Another zoomed in flow diagram to show how scores are collected for hybrid sub-queries and aggregations.

Normalization and Score combination-Copy of Sequence diagram Collector Manager Option drawio (1)

For the new implementation of the AggregationProcessor we need to have following logic for both pre and post processing methods:

when doing preProcess:
    if this is hybrid query OR concurrent search is enabled:
        delegate to Concurrent Aggregation processor preProcess
    else:
        delegate to Default Aggregation processor preProcess
when doing postProcess:
    if this is hybrid query OR concurrent search is enabled:
        delegate to Concurrent Aggregation processor postProcess
    else:
        delete to Default Aggregation processor postProcess

Reason for this is additional CollectorManager added for Hybrid Query. With that we need to do search with a Multi Collector Manager, in the same way as it’s currently done for the parallel segment search. For all other cases default logic will be used.

POC has been done to check feasibility of this option, repo with the POC code: https://github.com/martin-gaievski/neural-search/tree/add_aggregations_to_hybrid_query

We've found that approach isn't compatible with following aggregations for scenarios when concurrent segment search feature is enabled

  • sampler
  • diversified sampler
  • geohex grid

That's because these aggregations are explicitly set internal flag should use concurrent search to false, or use default implementation that does the same. As per core logic system must switch to a no concurrent search mode for such aggs, that must happen transparently to user.

For Hybrid search for these aggregations system return identical runtime exception response:

"illegal_state_exception","reason": "Target slice count should not be used when concurrent search is disabled"

Possible mitigation:
We can add a check and block queries with such aggregations if concurrent segment search is enabled.
For geohex grid this can be fixed easily by adding should use concurrent search flag.

Option 2: Adding existing collector to Query Collector Context

We can add aggregation collectors to the SearchContext manually and then run the post search processing that should collect results for aggregations.

Pros:

  • simple implementation
  • backward compatible from Hybrid query perspective
  • all aggregation types will be supported

Cons:

  • requires changes in core interfaces, core team is strongly against opening interfaces (PR with suggested changes in core), main concern is consistency of concurrent and non-concurrent search feature in core.

Methods that we need to change from “default” to “public” are:

Class QueryCollectorContext:

  • Collector create(Collector in)
  • void postProcess(QuerySearchResult result

Event flow will be following

get Collector Context → create new collector based on context and next collector passed to QueryPhaseSearcher → do search with searcher.search(query, collector) → for each collector context call manually postProcess

Risk is in skipping CollectorManager from the loop, in such case functionality of concurrent search is ignored.

Special cases

Case 1: Doc appears in result list for multiple sub-queries

Concern: Doc can be counted more than once by aggregation logic
Actual result: Aggregation doesn’t multiple result for one doc based on number of sub-queries with that doc.

Example:
Index has following docs

doc1 {"doc_keyword": "red", "doc_price": 10}
doc2 {"doc_keyword": "yellow", "doc_price": 20}
doc3 {"doc_keyword": "yellow", "doc_price": 15}
doc4 {"doc_keyword": "green", "doc_price": 25}
doc5 {"doc_keyword": "blue", "doc_price": 5}

hybrid query with sum and avg aggregations

"queries": [
    {"term": { "doc_keyword": "red"}},
    {
       "bool": {
          "should": [
              { "term": { "doc_keyword": "red"}},
              { "term": { "doc_keyword": "green"}} ]}},
    "aggs": {
        "sum_of_prices": {"sum": {"field": "doc_price"}},
        "avg_price": {"avg": {"field": "doc_price"}}
    }

actual result

"hits": {"total": {"value": 2},
   "hits": {doc1, doc4}
"aggregations": {
        "sum_of_prices": {
            "value": 35.0
        },
        "avg_price": {
            "value": 17.5
        }
    }   

if concern would be valid the result looks like below, where numbers for doc1 counted twice as it appears in results for both bool and term sub-queries

"hits": {"total": {"value": 2},
   "hits": {doc1, doc4}
"aggregations": {
        "sum_of_prices": {
            "value": 45.0
        },
        "avg_price": {
            "value": 22.5
        }
    } 
Case 2: Aggregation ignores all hits in all shards and is based only on a limited sub set

Concern: If size value is passed with query then all docs that are above that size and are cut from query results are also ignored by the aggregation
Actual result: All results are taking into account by aggregation processing before they are limited based on the size param

Example:
Index has following docs

doc1 {"doc_keyword": "red", "doc_price": 10}
doc2 {"doc_keyword": "yellow", "doc_price": 20}
doc3 {"doc_keyword": "yellow", "doc_price": 15}
doc4 {"doc_keyword": "green", "doc_price": 25}
doc5 {"doc_keyword": "blue", "doc_price": 5}
doc6 {"doc_keyword": "yellow", "doc_price": 11}
doc7 {"doc_keyword": "green", "doc_price": 17}
doc8 {"doc_keyword": "blue", "doc_price": 8}

hybrid query with sum and avg aggregations

"queries": [
    {"term": { "doc_keyword": "yellow"}},
    {"term": { "doc_keyword": "green"}}]},
    "aggs": {
        "sum_of_prices": {"sum": {"field": "doc_price"}},
        "avg_price": {"avg": {"field": "doc_price"}}
    },
    "size": 3

actual result

"hits": {"total": {"value": 5},
   "hits": {doc2, doc4, doc3}
"aggregations": {
        "sum_of_prices": {
            "value": 88.0
        },
        "avg_price": {
            "value": 17.6
        }
    }    

sum is calculated as:
20 (doc2) + 15 (doc3) + 11 (doc6) + 25 (doc4) + 17 (doc7) = 88.0

avg is calculated as:
sum (doc2 + doc3 + doc6 + doc4 + doc7) / 5 = 88.0/5 = 17.6

if concern would be valid the result looks like below, where numbers for doc1 counted twice as it appears in results for both bool and term sub-queries

"hits": {"total": {"value": 5},
   "hits": {doc2, doc4, doc3}
"aggregations": {
        "sum_of_prices": {
            "value": 60.0
        },
        "avg_price": {
            "value": 20.0
        }
    } 

Open Questions

  1. Can we extend approach by adding post-filter support

Currently filtering is added as one more collector and passed to QueryPhaseSearcher. In core the flow is following:

query searcher adds TopDocsCollector as a first collector →
collectors are added into one collector manager →
search is executed with collector manager, query results are collected first, then filter collector is applied on top of those results

Currently context methods are closed in core for both calling and extending

  1. Can we extend approach by adding support for sorting
    TBD

  2. What should be behavior for aggregations that are not compatible with hybrid query?
    If such a list is limited and does not include the most adopted aggregations, we can block the requests.

References

  1. High-level design for Score Normalization and Combination [RFC] High Level Approach and Design For Normalization and Score Combination #126
  2. Low level design for QueryPhaseSearcher [RFC] Low Level Design for Normalization and Score Combination Query Phase Searcher #193
  3. OpenSearch documentation on aggregations https://opensearch.org/docs/latest/aggregations/
  4. OpenSearch list of supported aggregations https://opensearch.org/docs/latest/aggregations/metric/index/
  5. Documentation page for Concurrent segment search in OpenSearch https://opensearch.org/docs/latest/search-plugins/concurrent-segment-search/

Feedback Required

As solution may drop support for certain aggregation types depending on the preferred approach, we mainly want to get some feedback if those unsupported aggregations are widely used. This will help us to understand if additional effort is required to support them, and if it's not possible then how system should behave.

@martin-gaievski
Copy link
Member Author

After doing more testing and deep dives I had to update RFC on Option 1. I'll put updates in RFC description, but main changes are listed below for visibility:
pros and cons section:

  • conflicting with concurrent search feature and thus 3 aggregations are supported when that feature is enabled
  • farther extensions like adding post filters are not feasible

I'm working on one more option, that needs more POCing before it can be included to the RFC. That may be a new Preferred one if I'm correct in my assumptions

@martin-gaievski
Copy link
Member Author

martin-gaievski commented Feb 22, 2024

I'm adding Option 3 that should be our preferred option for number of reasons: it's extensible, it supports all aggregations and it's compatible with concurrent search (that went GA in 2.12). Many thanks to @reta who reviewed and provided guidance for this option


Solution Details

Option 3: New Collector Manager on pre QueryPhaseSearcher stage [Preferred]

Pros:

  • utilizes new lucene/core CollectorManager API
  • compatible with concurrent search feature: all aggs will work
  • extensible for post filters
  • simplify query phase searcher implementation (maybe can just call core implementation, POC is needed)
  • in short term can utilize existing extension point for aggregation processor

Cons:

  • complex implementation, we’ll have three separate points (pre/post search and query phase searcher) instead on one (query phase searcher)
  • for long-term solution new extension point needs to be added to core OS

This Option is similar to Option 1, main difference is that new collector manager should be added to the context before other core collector managers. In such case we can rely on existing core code to execute collectors in correct order and reduce results at the end of search. In contrast for Option 1 that has to be done in custom QueryPhaseSearcher for hybrid query.

Following classes will be added with this option

Normalization and Score combination-Copy of Class Diagram QueryPhaseSearcher - Aggs drawio

Below is main sequence of flow for this approach for scenario when concurrent search is enabled and disabled. Main differences are:

  • for non-concurrent search reduce operation on collector manager is not called as part of Searcher.search and must be executed manually at later stage (aggregationProcessor.postProcess)
  • to be achieve point 1 collector must be re-used, because it has all the actual scores for hybrid query. It can be done via adding state to hybrid collector manager, similar to what is done for global aggregations in core.

Simpler case of concurrent search, newCollector and reduce methods are called inside the Searcher.search

Normalization and Score combination-Sequence diagram Collector Manager at Aggs processor Option - concurrentSearch drawio

Sequence diagram for case when Concurrent Search is enabled

More complex case of non-concurrent search, only newCollector called inside the Searcher.search and reduce must be called manually

Normalization and Score combination-Copy of Sequence diagram Collector Manager at Aggs processor Option - default search drawio

Sequence diagram for case when Concurrent Search is disabled

All tested aggregations are supported by this Option. That includes potential new aggregations added with default support mode for only default search.

Short Term/Long Term implementation

For preferred option there are short and long term implementations. Main differentiator is extension point for adding custom collector manager in a query phase. We need at least two extension points that are executed one before and one after the searcher.search()

Short term
Existing extension point of Aggregator Processor can be utilized. In fits requirement as per current implementation of QueryPhase.

  • preProcess - plugin registers new collector manager
  • postProcess - plugin execute reduce phase and format scores in format required for normaliziation

Aggregator Processor is returned by the plugin specific QueryPhaseSearcher. Plugins can register QueryPhaseSearcher, only on is allowed from all plugins.

Long term
New extension points added. Exact names are TBD, draft is collector manager configurator. One can be provided by the plugin class or plugin specific Query Phase Searcher.

Switching from Short to Long term shouldn’t cause massive changes in code of Collector Manager and Collector and can be done quickly. Main dependency is on adding proper extension point to core.

@reta
Copy link
Contributor

reta commented Feb 22, 2024

Thanks a lot @martin-gaievski , just to flash out a bit of context here:

  • the SearchContext::queryCollectorManagers is a long standing extension point for injecting collector managers at search phase: the access to it sealed though and it is not possible for plugins to add new ones before kicking of the search, we should fix that
  • the SearchContext::queryCollectorManagers is not very well integrated into non-concurrent search flow since the final CollectorManager::reduce is not called, we should fix that

We probably should not touch the existing Aggegator Processor flow (even if its lifecycle fits well) but have a new kind of hook / extension to allow adding to the SearchContext::queryCollectorManagers before and find out the way to manage non-concurrent flow better.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants