Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft implementation of pagination in v2 query engine. #1289

Closed

Conversation

MaxKsyunz
Copy link
Collaborator

@MaxKsyunz MaxKsyunz commented Jan 20, 2023

This PR shows a prototype of pagination in v2 engine.

It is purely for feedback and will not be merged.

Detailed description of query processing

Here's how a REST query that requests a page is processed by the SQL engine.

There are two types of page requests -- get first page and get a subsequent page.

Get first page request includes a query to execute. It is handled exactly the same as non-page requests.

Get subsequent page request includes cursor information. It is executed the same as other requests but does not need parse, analyze, or plan steps.

Step 1 — routing

If query is {"query" : "<statement>", "fetch_size": <number>}, it is passed to SQLService.execute. Only a page may be returned with a cursor ID to get more.

Else if query is {"cursor" : "<cursor_id>" } and cursor_id is a v2 cursor, it is passed to SQLService.execute. Otherwises it's processed by the v1 engine.

Step 2 — SQLService.execute

Creates an AbstractPlan and submits it to QueryManager.submit.

IF query is a v2 cursor -- creates ContinuePaginatedPlan. See QueryPlanFactory.create(String cursor,...

ELSE IF query is a statement and contains fetch_size > 0, parses the statement and creates a PaginatedPlan.

Submits the created abastract plan to QueryManager.submit.

See SQLService.plan, PaginatedPlan, ContinuePaginatedPlan`.

Step 3 — QueryManager.submit (OpenSearchQueryManager)

Schedules queryPlan.execute() as before.

Step 4 — PaginatedPlan.execute

Wraps the statement plan into Paginate unresolved plan and passes it to QueryService.execute.

QueryService.execute processes the unresolved plan as before to generate a physical plan.

Physical plan is passed to ExecutionEngine.execute.

Paginate — LogicalPaginate — PaginateOperator

Paginate unresolved plan is mapped to PaginateOperator.

PaginateOperator only returns up to fetch_size number of values from its input plan.

Step 4' — ContinuePaginatedPlan.execute

Uses PaginatedPlanCache to get a saved physical plan.

Passes it to QueryService.executePlan -- by-passes analysis and planning.

QueryService.executePlan(PhysicalPlan) passes it to ExecutionEngine.execute.

PaginatedPlanCache

An object that knows how to serialize a physical plan into a cursor and deserialize a physical plan from a cursor.

Simplest (current) implementation only works with one kind of physical plan. It saves unique parameters in the cursor and, conversely, generates physical plan from parameters encoded in the cursor. This is similar to how v1 engine works.

Future option could be using a different storage method, such as an OpenSearch index, for an arbitrary physical plan.

Step 5 — OpenSearchExecutionEngine.execute

Executes physical plan as usual.

After all values from execution are saved, uses PaginatedPlanCache.convertToCursor to possibly generate a cursor.

Passes the cursor to QueryResponse and calls response listener onResponse.

PaginatedPlanCache.convertToCursor

If passed physical plan starts with PaginateOperator, creates a cursor to that, when executed, would get the next page.

Currently, all relevant data will be encoded in the cursor id. In the future, it could be stored some place else -- like an index.

Step 6 — Response

QueryResponse object is processed by JdbcResponseFormatter.

JdbcResponseFormatter will add cursor field if QueryResponse contains a non-null cursor.

Open Problems

  1. How to synchronize PaginateOperator and OpenSearchScrollRequest and OpenSearchIndexScan. Ideally, when REST query has fetch_size and index scanning uses scroll requests, scroll window will match fetch_size.

  2. How to store unconsumed output rows when fetch_size in PaginateOperator is less than max winow size. Say uses is trying to paginate a simple select * query.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

MaxKsyunz added 3 commits January 19, 2023 14:30
Signed-off-by: MaxKsyunz <maxk@bitquilltech.com>
Signed-off-by: MaxKsyunz <maxk@bitquilltech.com>
@codecov-commenter
Copy link

Codecov Report

Merging #1289 (51088a2) into main (13faca4) will decrease coverage by 1.07%.
The diff coverage is 26.86%.

@@             Coverage Diff              @@
##               main    #1289      +/-   ##
============================================
- Coverage     98.34%   97.28%   -1.07%     
- Complexity     3600     3603       +3     
============================================
  Files           343      349       +6     
  Lines          8908     9014     +106     
  Branches        567      569       +2     
============================================
+ Hits           8761     8769       +8     
- Misses          142      232      +90     
- Partials          5       13       +8     
Flag Coverage Δ
sql-engine 97.28% <26.86%> (-1.07%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...ain/java/org/opensearch/sql/analysis/Analyzer.java 99.15% <0.00%> (-0.85%) ⬇️
...java/org/opensearch/sql/executor/QueryService.java 81.48% <0.00%> (-18.52%) ⬇️
.../sql/executor/execution/ContinuePaginatedPlan.java 0.00% <0.00%> (ø)
...ensearch/sql/executor/execution/PaginatedPlan.java 0.00% <0.00%> (ø)
...org/opensearch/sql/planner/DefaultImplementor.java 96.96% <0.00%> (-3.04%) ⬇️
...a/org/opensearch/sql/planner/PaginateOperator.java 0.00% <0.00%> (ø)
...pensearch/sql/planner/logical/LogicalPaginate.java 0.00% <0.00%> (ø)
...ch/sql/planner/logical/LogicalPlanNodeVisitor.java 95.65% <0.00%> (-4.35%) ⬇️
.../opensearch/sql/planner/physical/PhysicalPlan.java 100.00% <ø> (ø)
.../sql/planner/physical/PhysicalPlanNodeVisitor.java 95.00% <0.00%> (-5.00%) ⬇️
... and 21 more

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

Copy link
Collaborator

@Yury-Fridlyand Yury-Fridlyand left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if user sends fetch_size in a prometheus query?


@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
assert this.child == null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can't have assert in code. Maybe throw something if you need?


@RequiredArgsConstructor
@Data
static class SeriazationContext {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
static class SeriazationContext {
static class SerializationContext {

*/
public PhysicalPlan convertToPlan(String cursor) {
// TODO HACKY_HACK -- create a plan
if (cursor.startsWith("You got it!")) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe: make a constant while you are working on it? It will be easier to track it usage.

return new PaginateOperator(new ProjectOperator(scan, references, List.of()), 5, pageIndex);

} else {
throw new RuntimeException("Unsupported cursor");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe: return more info to simplify debugging? Note: cursor string is pretty long

if (cursor.startsWith("You got it!")) {
int pageIndex = Integer.parseInt(cursor.substring("You got it!".length()));

Table table = storageEngine.getTable(null, "phrases");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm - table/column names are hardcoded while WIP, right?
Consider multiple tables - join is coming...


@Override
public void explain(ResponseListener<ExecutionEngine.ExplainResponse> listener) {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe: throw or call explain from a regular plan?

@Getter
private final int pageIndex;

int numReturned = 0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private? Is it just a page counter?

@@ -17,7 +18,8 @@
*/
public abstract class PhysicalPlan implements PlanNode<PhysicalPlan>,
Iterator<ExprValue>,
AutoCloseable {
AutoCloseable,
Serializable {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

@@ -108,4 +108,7 @@ public boolean pushDownHighlight(LogicalHighlight highlight) {
public <R, C> R accept(LogicalPlanNodeVisitor<R, C> visitor, C context) {
return visitor.visitTableScanBuilder(this, context);
}

public void pushDownOffset(int i) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

@@ -19,7 +20,7 @@
/**
* OpenSearch search request.
*/
public interface OpenSearchRequest {
public interface OpenSearchRequest extends Serializable {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants