-
Notifications
You must be signed in to change notification settings - Fork 138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft implementation of pagination in v2 query engine. #1289
Conversation
Signed-off-by: MaxKsyunz <maxk@bitquilltech.com>
Signed-off-by: MaxKsyunz <maxk@bitquilltech.com>
Codecov Report
@@ Coverage Diff @@
## main #1289 +/- ##
============================================
- Coverage 98.34% 97.28% -1.07%
- Complexity 3600 3603 +3
============================================
Files 343 349 +6
Lines 8908 9014 +106
Branches 567 569 +2
============================================
+ Hits 8761 8769 +8
- Misses 142 232 +90
- Partials 5 13 +8
Flags with carried forward coverage won't be shown. Click here to find out more.
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if user sends fetch_size
in a prometheus query?
|
||
@Override | ||
public UnresolvedPlan attach(UnresolvedPlan child) { | ||
assert this.child == null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we can't have assert in code. Maybe throw
something if you need?
|
||
@RequiredArgsConstructor | ||
@Data | ||
static class SeriazationContext { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static class SeriazationContext { | |
static class SerializationContext { |
*/ | ||
public PhysicalPlan convertToPlan(String cursor) { | ||
// TODO HACKY_HACK -- create a plan | ||
if (cursor.startsWith("You got it!")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe: make a constant while you are working on it? It will be easier to track it usage.
return new PaginateOperator(new ProjectOperator(scan, references, List.of()), 5, pageIndex); | ||
|
||
} else { | ||
throw new RuntimeException("Unsupported cursor"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe: return more info to simplify debugging? Note: cursor string is pretty long
if (cursor.startsWith("You got it!")) { | ||
int pageIndex = Integer.parseInt(cursor.substring("You got it!".length())); | ||
|
||
Table table = storageEngine.getTable(null, "phrases"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to confirm - table/column names are hardcoded while WIP, right?
Consider multiple tables - join
is coming...
|
||
@Override | ||
public void explain(ResponseListener<ExecutionEngine.ExplainResponse> listener) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe: throw
or call explain
from a regular plan?
@Getter | ||
private final int pageIndex; | ||
|
||
int numReturned = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private
? Is it just a page counter?
@@ -17,7 +18,8 @@ | |||
*/ | |||
public abstract class PhysicalPlan implements PlanNode<PhysicalPlan>, | |||
Iterator<ExprValue>, | |||
AutoCloseable { | |||
AutoCloseable, | |||
Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why?
@@ -108,4 +108,7 @@ public boolean pushDownHighlight(LogicalHighlight highlight) { | |||
public <R, C> R accept(LogicalPlanNodeVisitor<R, C> visitor, C context) { | |||
return visitor.visitTableScanBuilder(this, context); | |||
} | |||
|
|||
public void pushDownOffset(int i) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why?
@@ -19,7 +20,7 @@ | |||
/** | |||
* OpenSearch search request. | |||
*/ | |||
public interface OpenSearchRequest { | |||
public interface OpenSearchRequest extends Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why?
This PR shows a prototype of pagination in v2 engine.
It is purely for feedback and will not be merged.
Detailed description of query processing
Here's how a REST query that requests a page is processed by the SQL engine.
There are two types of page requests -- get first page and get a subsequent page.
Get first page request includes a query to execute. It is handled exactly the same as non-page requests.
Get subsequent page request includes cursor information. It is executed the same as other requests but does not need parse, analyze, or plan steps.
Step 1 — routing
If query is
{"query" : "<statement>", "fetch_size": <number>}
, it is passed toSQLService.execute
. Only a page may be returned with a cursor ID to get more.Else if query is
{"cursor" : "<cursor_id>" }
andcursor_id
is a v2 cursor, it is passed toSQLService.execute
. Otherwises it's processed by the v1 engine.Step 2 — SQLService.execute
Creates an
AbstractPlan
and submits it toQueryManager.submit
.IF query is a v2 cursor -- creates
ContinuePaginatedPlan
. SeeQueryPlanFactory.create(String cursor,...
ELSE IF query is a statement and contains
fetch_size > 0
, parses the statement and creates aPaginatedPlan
.Submits the created abastract plan to
QueryManager.submit
.See
SQLService.plan
,PaginatedPlan,
ContinuePaginatedPlan`.Step 3 — QueryManager.submit (OpenSearchQueryManager)
Schedules
queryPlan.execute()
as before.Step 4 — PaginatedPlan.execute
Wraps the statement plan into
Paginate
unresolved plan and passes it toQueryService.execute
.QueryService.execute
processes the unresolved plan as before to generate a physical plan.Physical plan is passed to
ExecutionEngine.execute
.Paginate — LogicalPaginate — PaginateOperator
Paginate
unresolved plan is mapped toPaginateOperator
.PaginateOperator
only returns up tofetch_size
number of values from its input plan.Step 4' — ContinuePaginatedPlan.execute
Uses
PaginatedPlanCache
to get a saved physical plan.Passes it to
QueryService.executePlan
-- by-passes analysis and planning.QueryService.executePlan(PhysicalPlan)
passes it toExecutionEngine.execute
.PaginatedPlanCache
An object that knows how to serialize a physical plan into a cursor and deserialize a physical plan from a cursor.
Simplest (current) implementation only works with one kind of physical plan. It saves unique parameters in the cursor and, conversely, generates physical plan from parameters encoded in the cursor. This is similar to how v1 engine works.
Future option could be using a different storage method, such as an OpenSearch index, for an arbitrary physical plan.
Step 5 — OpenSearchExecutionEngine.execute
Executes physical plan as usual.
After all values from execution are saved, uses
PaginatedPlanCache.convertToCursor
to possibly generate a cursor.Passes the cursor to
QueryResponse
and calls response listeneronResponse.
PaginatedPlanCache.convertToCursor
If passed physical plan starts with
PaginateOperator
, creates a cursor to that, when executed, would get the next page.Currently, all relevant data will be encoded in the cursor id. In the future, it could be stored some place else -- like an index.
Step 6 — Response
QueryResponse
object is processed byJdbcResponseFormatter
.JdbcResponseFormatter
will addcursor
field ifQueryResponse
contains a non-null cursor.Open Problems
How to synchronize
PaginateOperator
andOpenSearchScrollRequest
andOpenSearchIndexScan
. Ideally, when REST query hasfetch_size
and index scanning uses scroll requests, scroll window will matchfetch_size
.How to store unconsumed output rows when
fetch_size
inPaginateOperator
is less than max winow size. Say uses is trying to paginate a simpleselect *
query.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.