-
Notifications
You must be signed in to change notification settings - Fork 138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft implementation of pagination in v2 query engine. #1289
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.sql.ast.tree; | ||
|
||
import java.util.List; | ||
import lombok.EqualsAndHashCode; | ||
import lombok.Getter; | ||
import lombok.RequiredArgsConstructor; | ||
import lombok.ToString; | ||
import org.opensearch.sql.ast.AbstractNodeVisitor; | ||
import org.opensearch.sql.ast.Node; | ||
|
||
@RequiredArgsConstructor | ||
@EqualsAndHashCode(callSuper = false) | ||
@ToString | ||
public class Paginate extends UnresolvedPlan { | ||
@Getter | ||
private final int pageSize; | ||
private UnresolvedPlan child; | ||
|
||
public Paginate(int pageSize, UnresolvedPlan child) { | ||
this.pageSize = pageSize; | ||
this.child = child; | ||
} | ||
|
||
@Override | ||
public List<? extends Node> getChild() { | ||
return List.of(child); | ||
} | ||
|
||
@Override | ||
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) { | ||
return nodeVisitor.visitPaginate(this, context); | ||
} | ||
|
||
@Override | ||
public UnresolvedPlan attach(UnresolvedPlan child) { | ||
assert this.child == null; | ||
this.child = child; | ||
return this; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,91 @@ | ||||||
/* | ||||||
* Copyright OpenSearch Contributors | ||||||
* SPDX-License-Identifier: Apache-2.0 | ||||||
*/ | ||||||
|
||||||
package org.opensearch.sql.executor; | ||||||
|
||||||
import java.io.ByteArrayInputStream; | ||||||
import java.io.IOException; | ||||||
import java.io.ObjectInputStream; | ||||||
import java.util.List; | ||||||
import java.util.stream.Collectors; | ||||||
import java.util.stream.Stream; | ||||||
import lombok.Data; | ||||||
import lombok.RequiredArgsConstructor; | ||||||
import org.opensearch.sql.expression.NamedExpression; | ||||||
import org.opensearch.sql.expression.ReferenceExpression; | ||||||
import org.opensearch.sql.opensearch.executor.Cursor; | ||||||
import org.opensearch.sql.planner.PaginateOperator; | ||||||
import org.opensearch.sql.planner.physical.PhysicalPlan; | ||||||
import org.opensearch.sql.planner.physical.PhysicalPlanNodeVisitor; | ||||||
import org.opensearch.sql.planner.physical.ProjectOperator; | ||||||
import org.opensearch.sql.storage.StorageEngine; | ||||||
import org.opensearch.sql.storage.Table; | ||||||
import org.opensearch.sql.storage.read.TableScanBuilder; | ||||||
|
||||||
@RequiredArgsConstructor | ||||||
public class PaginatedPlanCache { | ||||||
private final StorageEngine storageEngine; | ||||||
public static final PaginatedPlanCache None = new PaginatedPlanCache(null); | ||||||
|
||||||
@RequiredArgsConstructor | ||||||
@Data | ||||||
static class SeriazationContext { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
private final PaginatedPlanCache cache; | ||||||
} | ||||||
|
||||||
public static class SerializationVisitor | ||||||
extends PhysicalPlanNodeVisitor<byte[], SeriazationContext> { | ||||||
private static final byte[] NO_CURSOR = new byte[] {}; | ||||||
|
||||||
@Override | ||||||
public byte[] visitPaginate(PaginateOperator node, SeriazationContext context) { | ||||||
// Save cursor to read the next page. | ||||||
// Could process node.getChild() here with another visitor -- one that saves the | ||||||
// parameters for other physical operators -- ProjectOperator, etc. | ||||||
return String.format("You got it!%d", node.getPageIndex() + 1).getBytes(); | ||||||
} | ||||||
|
||||||
// Cursor is returned only if physical plan node is PaginateOerator. | ||||||
@Override | ||||||
protected byte[] visitNode(PhysicalPlan node, SeriazationContext context) { | ||||||
return NO_CURSOR; | ||||||
} | ||||||
} | ||||||
|
||||||
/** | ||||||
* Converts a physical plan tree to a cursor. May cache plan related data somewhere. | ||||||
*/ | ||||||
public Cursor convertToCursor(PhysicalPlan plan) { | ||||||
var serializer = new SerializationVisitor(); | ||||||
var raw = plan.accept(serializer, new SeriazationContext(this)); | ||||||
return new Cursor(raw); | ||||||
} | ||||||
|
||||||
/** | ||||||
* Convers a cursor to a physical plann tree. | ||||||
*/ | ||||||
public PhysicalPlan convertToPlan(String cursor) { | ||||||
// TODO HACKY_HACK -- create a plan | ||||||
if (cursor.startsWith("You got it!")) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe: make a constant while you are working on it? It will be easier to track it usage. |
||||||
int pageIndex = Integer.parseInt(cursor.substring("You got it!".length())); | ||||||
|
||||||
Table table = storageEngine.getTable(null, "phrases"); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to confirm - table/column names are hardcoded while WIP, right? |
||||||
TableScanBuilder scanBuilder = table.createScanBuilder(); | ||||||
scanBuilder.pushDownOffset(5 * pageIndex); | ||||||
PhysicalPlan scan = scanBuilder.build(); | ||||||
var fields = table.getFieldTypes(); | ||||||
List<NamedExpression> references = | ||||||
Stream.of("phrase", "test field", "insert_time2") | ||||||
.map(c -> | ||||||
new NamedExpression(c, new ReferenceExpression(c, List.of(c), fields.get(c)))) | ||||||
.collect(Collectors.toList()); | ||||||
|
||||||
return new PaginateOperator(new ProjectOperator(scan, references, List.of()), 5, pageIndex); | ||||||
|
||||||
} else { | ||||||
throw new RuntimeException("Unsupported cursor"); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe: return more info to simplify debugging? Note: cursor string is pretty long |
||||||
} | ||||||
} | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.sql.executor.execution; | ||
|
||
import org.apache.commons.lang3.NotImplementedException; | ||
import org.opensearch.sql.common.response.ResponseListener; | ||
import org.opensearch.sql.executor.ExecutionEngine; | ||
import org.opensearch.sql.executor.PaginatedPlanCache; | ||
import org.opensearch.sql.executor.QueryId; | ||
import org.opensearch.sql.executor.QueryService; | ||
import org.opensearch.sql.planner.physical.PhysicalPlan; | ||
|
||
public class ContinuePaginatedPlan extends AbstractPlan { | ||
|
||
public static final ContinuePaginatedPlan None | ||
= new ContinuePaginatedPlan(QueryId.None, "", null, | ||
null, null); | ||
private final String cursor; | ||
private final QueryService queryService; | ||
private final PaginatedPlanCache paginatedPlanCache; | ||
|
||
private final ResponseListener<ExecutionEngine.QueryResponse> queryResponseListener; | ||
|
||
|
||
/** | ||
* Create an abstract plan that can continue paginating a given cursor. | ||
*/ | ||
public ContinuePaginatedPlan(QueryId queryId, String cursor, QueryService queryService, | ||
PaginatedPlanCache ppc, | ||
ResponseListener<ExecutionEngine.QueryResponse> | ||
queryResponseListener) { | ||
super(queryId); | ||
this.cursor = cursor; | ||
this.paginatedPlanCache = ppc; | ||
this.queryService = queryService; | ||
this.queryResponseListener = queryResponseListener; | ||
} | ||
|
||
@Override | ||
public void execute() { | ||
PhysicalPlan plan = paginatedPlanCache.convertToPlan(cursor); | ||
queryService.executePlan(plan, queryResponseListener); | ||
} | ||
|
||
@Override | ||
public void explain(ResponseListener<ExecutionEngine.ExplainResponse> listener) { | ||
throw new NotImplementedException("Explain of query continuation is not supported"); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.sql.executor.execution; | ||
|
||
import org.opensearch.sql.ast.tree.Paginate; | ||
import org.opensearch.sql.ast.tree.UnresolvedPlan; | ||
import org.opensearch.sql.common.response.ResponseListener; | ||
import org.opensearch.sql.executor.ExecutionEngine; | ||
import org.opensearch.sql.executor.QueryId; | ||
import org.opensearch.sql.executor.QueryService; | ||
|
||
public class PaginatedPlan extends AbstractPlan { | ||
final UnresolvedPlan plan; | ||
final int fetchSize; | ||
final QueryService queryService; | ||
final ResponseListener<ExecutionEngine.QueryResponse> | ||
queryResponseResponseListener; | ||
|
||
/** | ||
* Create an abstract plan that can start paging a query. | ||
*/ | ||
public PaginatedPlan(QueryId queryId, UnresolvedPlan plan, int fetchSize, | ||
QueryService queryService, | ||
ResponseListener<ExecutionEngine.QueryResponse> | ||
queryResponseResponseListener) { | ||
super(queryId); | ||
this.plan = plan; | ||
this.fetchSize = fetchSize; | ||
this.queryService = queryService; | ||
this.queryResponseResponseListener = queryResponseResponseListener; | ||
} | ||
|
||
@Override | ||
public void execute() { | ||
queryService.execute(new Paginate(fetchSize, plan), queryResponseResponseListener); | ||
} | ||
|
||
@Override | ||
public void explain(ResponseListener<ExecutionEngine.ExplainResponse> listener) { | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe: |
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we can't have assert in code. Maybe
throw
something if you need?