Skip to content

[RFC] 200x faster joins with ID pre-filtering #5093

@Swiddis

Description

@Swiddis

Problem Statement

Joins are slow.

Current State

One particular reason that they're slow for us is that they're very prone to do full index scans in search of matching records. Consider a query such as:

source=request_logs
| lookup dim_lookup host_key append service_name, environment, region
| where _lookup = "host"
| where service_name = "payment-service"
| head
| fields request_id, region;

Intuitively: "Find the region of 10 requests from hosts involved with the payment service."

In the current implementation, this turns into two index scans: one goes through the lookup index in search of matches, and the other goes through the request_logs.

opensearchsql> explain source=request_logs | lookup dim_lookup host_key append _lookup, service_name, environment, region | where _lookup = "host" | where service_name = "payment-service" | head 10 | fields request_id, host_key, region;
= Calcite Plan =
== Logical ==
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
  LogicalProject(request_id=[$17], host_key=[$10], region=[$29])
    LogicalSort(fetch=[10])
      LogicalFilter(condition=[=($28, 'payment-service')])
        LogicalFilter(condition=[=($26, 'host')])
          LogicalProject(trace_id=[$0], endpoint_key=[$1], bytes_received=[$2], bytes_sent=[$3], date_key=[$4], latency_ms=[$5], request_count=[$6], response_body_size=[$7], is_timeout=[$8], client_key=[$9], host_key=[$10], request_timestamp=[$11], upstream_latency_ms=[$12], request_body_size=[$13], retry_count=[$14], time_to_first_byte_ms=[$15], is_error=[$16], request_id=[$17], status_key=[$18], time_key=[$19], _id=[$20], _index=[$21], _score=[$22], _maxscore=[$23], _sort=[$24], _routing=[$25], _lookup=[$26], environment=[$27], service_name=[$29], region=[$30])
            LogicalJoin(condition=[=($10, $28)], joinType=[left])
              CalciteLogicalIndexScan(table=[[OpenSearch, request_logs]])
              LogicalProject(_lookup=[$4], environment=[$44], host_key=[$28], service_name=[$9], region=[$17])
                CalciteLogicalIndexScan(table=[[OpenSearch, dim_lookup]])

== Physical ==
EnumerableLimit(fetch=[10000])
  EnumerableCalc(expr#0..5=[{inputs}], request_id=[$t5], host_key=[$t4], region=[$t3])
    EnumerableLimit(fetch=[10])
      EnumerableHashJoin(condition=[=($1, $4)], joinType=[inner])
        CalciteEnumerableIndexScan(table=[[OpenSearch, dim_lookup]], PushDownContext=[[PROJECT->[_lookup, host_key, service_name, region], FILTER->AND(=($0, 'host'), =($2, 'payment-service'))], OpenSearchRequestBuilder(sourceBuilder={
          "from": 0,
          "timeout": "1m",
          "query": {
            "bool": {
              "must": [
                {
                  "term": {
                    "_lookup.keyword": {
                      "value": "host",
                      "boost": 1.0
                    }
                  }
                },
                {
                  "term": {
                    "service_name.keyword": {
                      "value": "payment-service",
                      "boost": 1.0
                    }
                  }
                }
              ],
              "adjust_pure_negative": true,
              "boost": 1.0
            }
          },
          "_source": {
            "includes": [
              "_lookup",
              "host_key",
              "service_name",
              "region"
            ],
            "excludes": []
          }
        }, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
        CalciteEnumerableIndexScan(table=[[OpenSearch, request_logs]], PushDownContext=[[PROJECT->[host_key, request_id]], OpenSearchRequestBuilder(sourceBuilder={
          "from": 0,
          "timeout": "1m",
          "_source": {
            "includes": [
              "host_key",
              "request_id"
            ],
            "excludes": []
          }
        }, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

For 2 million request_logs records, this takes about 17 seconds on my machine.

$ hyperfine 'cat /tmp/query.ppl | xh post http://localhost:9200/_plugins/_ppl' --warmup 2
Benchmark 1: cat /tmp/query.ppl | xh post http://localhost:9200/_plugins/_ppl
  Time (mean ± σ):     16.791 s ±  0.080 s    [User: 0.007 s, System: 0.004 s]
  Range (min … max):   16.654 s … 16.948 s    10 runs

Long-Term Goals

This is a specific special-case optimization to make sparse star joins faster (explained below). These types of joins are very common in analytics.

Proposal

Looking at the query, we're currently doing something like this:

  • Take the subset of dim_lookup matching our filters (2 records)
  • Take the subset of request_logs matching our filters (no filters, 2 million records)
  • Go through both, collecting all the records in a hash joined by the key
  • Go through the hash and find 10 records matching all conditions. (We can't push this limit down through the hash as matches may be sparse in the general case.)

But we're joining on a key, so why do we need to go through all of request_logs first? Suppose we did instead something like this:

  • Take the subset of dim_lookup matching our filters (2 records)
  • Take the subset of request_logs matching our filters and having the host_keys matched by the lookup search (138k records)
  • Take the first 10 matches (guaranteed to be matches due to pre-filtering, so we need no extra collection)

In the data warehousing space, this strategy is called a star join (or semijoin). In analytics, you typically have a lot of dimension tables which supply context, and this can be used to rapidly filter and process your central fact tables which has the main metrics for your business process. This allows for easy report generation from a user's perspective. From a query optimizer's perspective, you pre-compute the matching foreign keys and use that to quickly scan the fact table, then do the same process in reverse to create the report.

Image (Source: Kimball's Data Warehousing Toolkit, 3e)

I did a proof-of-concept to test this (hacked with AI for demonstration, I wouldn't trust its correctness outside this specific query). In the proof-of-concept, we pre-execute the lookup part and use the matches to build a term query on the main request search. This approach should work well with any query that returns less than 1000 keys or so.

opensearchsql> explain source=request_logs | lookup dim_lookup.host host_key append service_name, environment, region | where service_name = "payment-service" | head 10 | fields request_id, host_key, region;
= Calcite Plan =
== Logical ==
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
  LogicalProject(request_id=[$17], host_key=[$10], region=[$28])
    LogicalSort(fetch=[10])
      LogicalFilter(condition=[=($27, 'payment-service')])
        LogicalProject(trace_id=[$0], endpoint_key=[$1], bytes_received=[$2], bytes_sent=[$3], date_key=[$4], latency_ms=[$5], request_count=[$6], response_body_size=[$7], is_timeout=[$8], client_key=[$9], host_key=[$10], request_timestamp=[$11], upstream_latency_ms=[$12], request_body_size=[$13], retry_count=[$14]
, time_to_first_byte_ms=[$15], is_error=[$16], request_id=[$17], status_key=[$18], time_key=[$19], _id=[$20], _index=[$21], _score=[$22], _maxscore=[$23], _sort=[$24], _routing=[$25], environment=[$26], service_name=[$28], region=[$29])
          LogicalJoin(condition=[=($10, $27)], joinType=[left])
            CalciteLogicalIndexScan(table=[[OpenSearch, request_logs]])
            LogicalProject(environment=[$44], host_key=[$28], service_name=[$9], region=[$17])
              LogicalFilter(condition=[=($4, 'host')])
                CalciteLogicalIndexScan(table=[[OpenSearch, dim_lookup]])

== Physical ==
EnumerableLimit(fetch=[10000])
  EnumerableCalc(expr#0..4=[{inputs}], request_id=[$t1], host_key=[$t0], region=[$t4])
    EnumerableLimit(fetch=[10])
      EnumerableHashJoin(condition=[=($0, $2)], joinType=[inner])
        CalciteEnumerableIndexScan(table=[[OpenSearch, request_logs]], PushDownContext=[[PROJECT->[host_key, request_id], LIMIT->10, TERMS_FILTER->host_key IN (2 values)], OpenSearchRequestBuilder(sourceBuilder={
          "from": 0,
          "size": 10,
          "timeout": "1m",
          "query": {
            "terms": {
              "host_key.keyword": [
                "c2efc86c-1265-4c94-9c22-50612bd0ec42",
                "af8f3c89-1ded-4124-bb73-ac12a128df7d"
              ],
              "boost": 1.0
            }
          },
          "_source": {
            "includes": [
              "host_key",
              "request_id"
            ],
            "excludes": []
          }
        }, requestedTotalSize=10, pageSize=null, startFrom=0)])
        CalciteEnumerableIndexScan(table=[[OpenSearch, dim_lookup]], PushDownContext=[[PROJECT->[_lookup, service_name, region, host_key], FILTER->=($0, 'host'), PROJECT->[host_key, service_name, region], FILTER->=($1, 'payment-service')], OpenSearchRequestBuilder(sourceBuilder={
          "from": 0,
          "timeout": "1m",
          "query": {
            "bool": {
              "filter": [
                {
                  "term": {
                    "_lookup.keyword": {
                      "value": "host",
                      "boost": 1.0
                    }
                  }
                },
                {
                  "term": {
                    "service_name.keyword": {
                      "value": "payment-service",
                      "boost": 1.0
                    }
                  }
                }
              ],
              "adjust_pure_negative": true,
              "boost": 1.0
            }
          },
          "_source": {
            "includes": [
              "host_key",
              "service_name",
              "region"
            ],
            "excludes": []
          }
        }, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

This runs ~240x faster in this case. In general it's the duration of the two sub-queries plus constant overhead.

$ hyperfine 'cat /tmp/query.ppl | xh post http://localhost:9200/_plugins/_ppl' --warmup 2
Benchmark 1: cat /tmp/query.ppl | xh post http://localhost:9200/_plugins/_ppl
  Time (mean ± σ):      69.8 ms ±   8.6 ms    [User: 7.3 ms, System: 2.5 ms]
  Range (min … max):    57.0 ms …  86.8 ms    34 runs

Approach

For a production implementation, I vaguely see two approaches to make this work. I haven't worked out either in a ton of detail.

  • One is that we extend on the POC approach of frontloading all of this in the planning stage. This makes planning more expensive for joins, and introduces some edge cases like "what if the subquery fails?", but if we get a hit then it makes the join results dramatically faster. (For analytics-style queries were we add up all the results, we still iterate through far less of the index.)
  • The other is an adaptive execution approach: We start the hash join as normal, and if we realize mid-way that our result set from one index is small, we can restart the join on the other index with the term query applied.

I haven't worked out the deep details of either approach, but open to comments. Just opening this to float the ideas around. I'll probably be adding to this RFC over time.

Alternative

N/A

Implementation Discussion

N/A

Metadata

Metadata

Assignees

No one assigned

    Labels

    RFCRequest For Commentscalcitecalcite migration releated

    Type

    No type

    Projects

    Status

    New

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions