-
Notifications
You must be signed in to change notification settings - Fork 176
Implement one-batch lookahead for index enumerators #4345
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Implement one-batch lookahead for index enumerators #4345
Conversation
Signed-off-by: Simeon Widdis <sawiddis@amazon.com>
Security IT failures are confusing me here -- seems like they're all consistently failing but the changed code doesn't show up anywhere in any of the stack traces |
…atches-in-enumeration
Signed-off-by: Simeon Widdis <sawiddis@amazon.com>
Signed-off-by: Simeon Widdis <sawiddis@amazon.com>
Signed-off-by: Simeon Widdis <sawiddis@amazon.com>
Signed-off-by: Simeon Widdis <sawiddis@amazon.com>
settings, | ||
AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME, | ||
SQL_WORKER_THREAD_POOL_NAME, | ||
OpenSearchExecutors.allocatedProcessors(settings), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if this is the best number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally it should match the number of search threads since that's where all the requests go, maybe I can find where that number is stored and do a lookup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java
Show resolved
Hide resolved
public void startScanning(OpenSearchRequest request) { | ||
if (isAsync()) { | ||
nextBatchFuture = | ||
CompletableFuture.supplyAsync(() -> client.search(request), backgroundExecutor); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am surprised this is working without copying the thread context like this: https://github.com/opensearch-project/sql/blob/main/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java#L39
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What am I missing? Why do we need context copy in other file. Can you print the thread context in this task and see if it has user credentials in case of FGAC?
PPLPermissionsIT can we add a join test here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't just used for joins, every query goes through this interface. I realized while benchmarking that there were no queries that weren't hitting the pool. That the security ITs pass means either this works or we don't have security ITs.
I believe it works because we supply the cluster settings during the construction of the executor, so it's built-in to the thread context (as opposed to starting a fresh thread with no executor)
Added few comments. Good one 👍 . |
…atches-in-enumeration
Signed-off-by: Simeon Widdis <sawiddis@amazon.com>
opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java
Outdated
Show resolved
Hide resolved
} | ||
this.client = client; | ||
this.bgScanner = new BackgroundSearchScanner(client); | ||
this.bgScanner.startScanning(request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This constructor only been called once per query? I found comments, Could u double confirm?
/**
* This Enumerator may be iterated for multiple times, so we need to create opensearch request for
* each time to avoid reusing source builder. That's because the source builder has stats like PIT
* or SearchAfter recorded during previous search.
*/
@Override
public Enumerable<@Nullable Object> scan() {
return new AbstractEnumerable<>() {
@Override
public Enumerator<Object> enumerator() {
OpenSearchRequestBuilder requestBuilder = getOrCreateRequestBuilder();
return new OpenSearchIndexEnumerator(
osIndex.getClient(),
getFieldPath(),
requestBuilder.getMaxResponseSize(),
requestBuilder.getMaxResultWindow(),
osIndex.buildRequest(requestBuilder),
osIndex.createOpenSearchResourceMonitor());
}
};
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the same as current behavior, right? If you recreate the enumerator with a new client, you erase all of its current state and start a new search. In that snippet it looks like this is deliberately meant to restart the search multiple times
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The concerns is, If scan() metheod is been called multiple times in planning stage, it will invoke startScanning multiple times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Woah, I wouldn't have expected planning to make a call to scan, seems weird... Can try to find a better way to handle that, but scan intuitively to me means "actually start scanning something"
nextBatchFuture = | ||
CompletableFuture.supplyAsync(() -> client.search(request), backgroundExecutor); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if fixedThreadPool full, should fallback to sync?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We just buffer: #4345 (comment)
If we fallback to sync, it eliminates the utility of being able to directly view/control the active SQL network requests via the BG thread pool
...arch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java
Outdated
Show resolved
Hide resolved
Signed-off-by: Simeon Widdis <sawiddis@amazon.com>
Signed-off-by: Simeon Widdis <sawiddis@amazon.com>
…atches-in-enumeration
Turns out I flipped the benchmark in my head, so this is overall a regression -- going to put back in draft and figure out a better approach |
Description
In local benchmarking of merge operations, I saw we were spending a lot of time waiting for synchronous fetching of batches across both indices.
Because of the PIT-based design, we can't parallelize page fetches directly, but one low-hanging fruit here is to start fetching the next batch as soon as we get the current one, so by the time we start the next batch it'll already be halfway ready. This cuts enumerated merge times by ~40%.
To implement this safely, this PR needs to do a few things:
SQLPlugin.java
changes. I also fixed our thread configuration settings.OpenSearchClient.java
, I did several plumbing changes around that update.BackgroundSearchScanner.java
Some alternatives for the long-term:
In draft pending testing.
Related Issues
N/A
Check List
--signoff
or-s
.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.