Skip to content

Conversation

Swiddis
Copy link
Collaborator

@Swiddis Swiddis commented Sep 22, 2025

Description

In local benchmarking of merge operations, I saw we were spending a lot of time waiting for synchronous fetching of batches across both indices.

Because of the PIT-based design, we can't parallelize page fetches directly, but one low-hanging fruit here is to start fetching the next batch as soon as we get the current one, so by the time we start the next batch it'll already be halfway ready. This cuts enumerated merge times by ~40%.

To implement this safely, this PR needs to do a few things:

  • Register a new thread pool that has authentication context (we can't run background threads if we don't do this)
    • See SQLPlugin.java changes. I also fixed our thread configuration settings.
    • We need a new pool as we'll hang the worker pool if there's only one thread.
  • Safely handle whether we have a NodeClient or not within the Calcite enumeration inner loop
    • This was the interface change in OpenSearchClient.java, I did several plumbing changes around that update.
  • Actually implement the background scanner, with a fallback to synchronous scanning if we're missing node context. BackgroundSearchScanner.java

Some alternatives for the long-term:

In draft pending testing.

Related Issues

N/A

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • New functionality has javadoc added.
  • New functionality has a user manual doc added.
  • New PPL command checklist all confirmed.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff or -s.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Simeon Widdis <sawiddis@amazon.com>
@Swiddis Swiddis added the enhancement New feature or request label Sep 22, 2025
@Swiddis
Copy link
Collaborator Author

Swiddis commented Sep 23, 2025

Security IT failures are confusing me here -- seems like they're all consistently failing but the changed code doesn't show up anywhere in any of the stack traces

Signed-off-by: Simeon Widdis <sawiddis@amazon.com>
Signed-off-by: Simeon Widdis <sawiddis@amazon.com>
@Swiddis
Copy link
Collaborator Author

Swiddis commented Sep 25, 2025

Some additional testing info:

I took 5 million records from the big5 benchmarking dataset and compared the current mainline with this.

First, as sanity, the results are the same for one of the queries requiring a full index enumeration:

source = big5
| eval range_bucket = case(
   `metrics.size` < -10, 'range_1',
   `metrics.size` >= -10 and `metrics.size` < 10, 'range_2',
   `metrics.size` >= 10 and `metrics.size` < 100, 'range_3',
   `metrics.size` >= 100 and `metrics.size` < 1000, 'range_4',
   `metrics.size` >= 1000 and `metrics.size` < 2000, 'range_5',
   `metrics.size` >= 2000, 'range_6')
| stats count() by range_bucket, span(`@timestamp`, 1h) as auto_span
| sort + range_bucket, + auto_span

Current mainline:

fetched rows / total rows = 48/48
+---------+---------------------+--------------+
| count() | auto_span           | range_bucket |
|---------+---------------------+--------------|
| 122464  | 2022-12-31 16:00:00 | range_5      |
| 121585  | 2022-12-31 17:00:00 | range_5      |
| 122052  | 2022-12-31 18:00:00 | range_5      |
| 122220  | 2022-12-31 19:00:00 | range_5      |
| 122163  | 2022-12-31 20:00:00 | range_5      |
| 121840  | 2022-12-31 21:00:00 | range_5      |
| 121606  | 2022-12-31 22:00:00 | range_5      |
| 121889  | 2022-12-31 23:00:00 | range_5      |
| 121088  | 2023-01-01 00:00:00 | range_5      |
| 121943  | 2023-01-01 01:00:00 | range_5      |

After update:

fetched rows / total rows = 48/48
+---------+---------------------+--------------+
| count() | auto_span           | range_bucket |
|---------+---------------------+--------------|
| 122464  | 2022-12-31 16:00:00 | range_5      |
| 121585  | 2022-12-31 17:00:00 | range_5      |
| 122052  | 2022-12-31 18:00:00 | range_5      |
| 122220  | 2022-12-31 19:00:00 | range_5      |
| 122163  | 2022-12-31 20:00:00 | range_5      |
| 121840  | 2022-12-31 21:00:00 | range_5      |
| 121606  | 2022-12-31 22:00:00 | range_5      |
| 121889  | 2022-12-31 23:00:00 | range_5      |
| 121088  | 2023-01-01 00:00:00 | range_5      |
| 121943  | 2023-01-01 01:00:00 | range_5      |

Second, I wanted to benchmark and check for impact. I already tested with joins and it's ~40% faster, but for non-joins we potentially pay overhead for nothing.

For the slowest big5 queries (BG fetches on the left, sync fetches on the right), we see slight perf gains:
image

For the fastest ones, the performance is approximately the same (some minor latency and throughput diffs but I'm not confident that this isn't just random variation):
image

Signed-off-by: Simeon Widdis <sawiddis@amazon.com>
settings,
AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME,
SQL_WORKER_THREAD_POOL_NAME,
OpenSearchExecutors.allocatedProcessors(settings),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this is the best number.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally it should match the number of search threads since that's where all the requests go, maybe I can find where that number is stored and do a lookup.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to pull the search thread pool count if available, otherwise fallback to node processors. This is what it looks like if you limit the search thread pool under heavy load:

image

Intuitively this seems like a pretty informative view of what state the cluster's in regarding SQL queries.

public void startScanning(OpenSearchRequest request) {
if (isAsync()) {
nextBatchFuture =
CompletableFuture.supplyAsync(() -> client.search(request), backgroundExecutor);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What am I missing? Why do we need context copy in other file. Can you print the thread context in this task and see if it has user credentials in case of FGAC?

PPLPermissionsIT can we add a join test here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't just used for joins, every query goes through this interface. I realized while benchmarking that there were no queries that weren't hitting the pool. That the security ITs pass means either this works or we don't have security ITs.

I believe it works because we supply the cluster settings during the construction of the executor, so it's built-in to the thread context (as opposed to starting a fresh thread with no executor)

@vamsimanohar
Copy link
Member

Added few comments. Good one 👍 .

@Swiddis Swiddis added the v3.3.0 label Sep 29, 2025
vamsimanohar
vamsimanohar previously approved these changes Sep 30, 2025
}
this.client = client;
this.bgScanner = new BackgroundSearchScanner(client);
this.bgScanner.startScanning(request);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor only been called once per query? I found comments, Could u double confirm?

  /**
   * This Enumerator may be iterated for multiple times, so we need to create opensearch request for
   * each time to avoid reusing source builder. That's because the source builder has stats like PIT
   * or SearchAfter recorded during previous search.
   */
  @Override
  public Enumerable<@Nullable Object> scan() {
    return new AbstractEnumerable<>() {
      @Override
      public Enumerator<Object> enumerator() {
        OpenSearchRequestBuilder requestBuilder = getOrCreateRequestBuilder();
        return new OpenSearchIndexEnumerator(
            osIndex.getClient(),
            getFieldPath(),
            requestBuilder.getMaxResponseSize(),
            requestBuilder.getMaxResultWindow(),
            osIndex.buildRequest(requestBuilder),
            osIndex.createOpenSearchResourceMonitor());
      }
    };
  }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the same as current behavior, right? If you recreate the enumerator with a new client, you erase all of its current state and start a new search. In that snippet it looks like this is deliberately meant to restart the search multiple times

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concerns is, If scan() metheod is been called multiple times in planning stage, it will invoke startScanning multiple times.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woah, I wouldn't have expected planning to make a call to scan, seems weird... Can try to find a better way to handle that, but scan intuitively to me means "actually start scanning something"

Comment on lines +143 to +144
nextBatchFuture =
CompletableFuture.supplyAsync(() -> client.search(request), backgroundExecutor);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if fixedThreadPool full, should fallback to sync?

Copy link
Collaborator Author

@Swiddis Swiddis Sep 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just buffer: #4345 (comment)

If we fallback to sync, it eliminates the utility of being able to directly view/control the active SQL network requests via the BG thread pool

Signed-off-by: Simeon Widdis <sawiddis@amazon.com>
Signed-off-by: Simeon Widdis <sawiddis@amazon.com>
@Swiddis Swiddis removed the v3.3.0 label Sep 30, 2025
@Swiddis Swiddis added the calcite calcite migration releated label Oct 3, 2025
@Swiddis
Copy link
Collaborator Author

Swiddis commented Oct 7, 2025

Turns out I flipped the benchmark in my head, so this is overall a regression -- going to put back in draft and figure out a better approach

@Swiddis Swiddis marked this pull request as draft October 7, 2025 23:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

calcite calcite migration releated enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants