Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft implementation of pagination in v2 query engine. #1289

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.sql.ast.tree.Kmeans;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Project;
import org.opensearch.sql.ast.tree.RareTopN;
Expand Down Expand Up @@ -85,6 +86,7 @@
import org.opensearch.sql.planner.logical.LogicalLimit;
import org.opensearch.sql.planner.logical.LogicalML;
import org.opensearch.sql.planner.logical.LogicalMLCommons;
import org.opensearch.sql.planner.logical.LogicalPaginate;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalProject;
import org.opensearch.sql.planner.logical.LogicalRareTopN;
Expand Down Expand Up @@ -529,6 +531,12 @@ public LogicalPlan visitML(ML node, AnalysisContext context) {
return new LogicalML(child, node.getArguments());
}

@Override
public LogicalPlan visitPaginate(Paginate paginate, AnalysisContext context) {
LogicalPlan child = paginate.getChild().get(0).accept(this, context);
return new LogicalPaginate(paginate.getPageSize(), List.of(child));
}

/**
* The first argument is always "asc", others are optional.
* Given nullFirst argument, use its value. Otherwise just use DEFAULT_ASC/DESC.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.sql.ast.tree.Kmeans;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Project;
import org.opensearch.sql.ast.tree.RareTopN;
Expand Down Expand Up @@ -289,4 +290,8 @@ public T visitQuery(Query node, C context) {
public T visitExplain(Explain node, C context) {
return visitStatement(node, context);
}

public T visitPaginate(Paginate paginate, C context) {
return visitChildren(paginate, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
public class Query extends Statement {

protected final UnresolvedPlan plan;
protected final int fetchSize;

@Override
public <R, C> R accept(AbstractNodeVisitor<R, C> visitor, C context) {
Expand Down
45 changes: 45 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/Paginate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.Node;

@RequiredArgsConstructor
@EqualsAndHashCode(callSuper = false)
@ToString
public class Paginate extends UnresolvedPlan {
@Getter
private final int pageSize;
private UnresolvedPlan child;

public Paginate(int pageSize, UnresolvedPlan child) {
this.pageSize = pageSize;
this.child = child;
}

@Override
public List<? extends Node> getChild() {
return List.of(child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitPaginate(this, context);
}

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
assert this.child == null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can't have assert in code. Maybe throw something if you need?

this.child = child;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.opensearch.executor.Cursor;
import org.opensearch.sql.planner.physical.PhysicalPlan;

/**
Expand Down Expand Up @@ -53,6 +54,8 @@ void execute(PhysicalPlan plan, ExecutionContext context,
class QueryResponse {
private final Schema schema;
private final List<ExprValue> results;

private final Cursor cursor;
}

@Data
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.executor;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.expression.NamedExpression;
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.opensearch.executor.Cursor;
import org.opensearch.sql.planner.PaginateOperator;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.planner.physical.PhysicalPlanNodeVisitor;
import org.opensearch.sql.planner.physical.ProjectOperator;
import org.opensearch.sql.storage.StorageEngine;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.storage.read.TableScanBuilder;

@RequiredArgsConstructor
public class PaginatedPlanCache {
private final StorageEngine storageEngine;
public static final PaginatedPlanCache None = new PaginatedPlanCache(null);

@RequiredArgsConstructor
@Data
static class SeriazationContext {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
static class SeriazationContext {
static class SerializationContext {

private final PaginatedPlanCache cache;
}

public static class SerializationVisitor
extends PhysicalPlanNodeVisitor<byte[], SeriazationContext> {
private static final byte[] NO_CURSOR = new byte[] {};

@Override
public byte[] visitPaginate(PaginateOperator node, SeriazationContext context) {
// Save cursor to read the next page.
// Could process node.getChild() here with another visitor -- one that saves the
// parameters for other physical operators -- ProjectOperator, etc.
return String.format("You got it!%d", node.getPageIndex() + 1).getBytes();
}

// Cursor is returned only if physical plan node is PaginateOerator.
@Override
protected byte[] visitNode(PhysicalPlan node, SeriazationContext context) {
return NO_CURSOR;
}
}

/**
* Converts a physical plan tree to a cursor. May cache plan related data somewhere.
*/
public Cursor convertToCursor(PhysicalPlan plan) {
var serializer = new SerializationVisitor();
var raw = plan.accept(serializer, new SeriazationContext(this));
return new Cursor(raw);
}

/**
* Convers a cursor to a physical plann tree.
*/
public PhysicalPlan convertToPlan(String cursor) {
// TODO HACKY_HACK -- create a plan
if (cursor.startsWith("You got it!")) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe: make a constant while you are working on it? It will be easier to track it usage.

int pageIndex = Integer.parseInt(cursor.substring("You got it!".length()));

Table table = storageEngine.getTable(null, "phrases");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm - table/column names are hardcoded while WIP, right?
Consider multiple tables - join is coming...

TableScanBuilder scanBuilder = table.createScanBuilder();
scanBuilder.pushDownOffset(5 * pageIndex);
PhysicalPlan scan = scanBuilder.build();
var fields = table.getFieldTypes();
List<NamedExpression> references =
Stream.of("phrase", "test field", "insert_time2")
.map(c ->
new NamedExpression(c, new ReferenceExpression(c, List.of(c), fields.get(c))))
.collect(Collectors.toList());

return new PaginateOperator(new ProjectOperator(scan, references, List.of()), 5, pageIndex);

} else {
throw new RuntimeException("Unsupported cursor");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe: return more info to simplify debugging? Note: cursor string is pretty long

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* Query id of {@link AbstractPlan}.
*/
public class QueryId {
public static final QueryId None = new QueryId("");
/**
* Query id.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ public void executePlan(LogicalPlan plan,
}
}

/**
* Execute a physical plan without analyzing or planning anything.
*/
public void executePlan(PhysicalPlan plan,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
try {
executionEngine.execute(plan, ExecutionContext.emptyExecutionContext(), listener);
} catch (Exception e) {
listener.onFailure(e);
}
}

/**
* Explain the query in {@link UnresolvedPlan} using {@link ResponseListener} to
* get and format explain response.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.executor.execution;

import org.apache.commons.lang3.NotImplementedException;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.PaginatedPlanCache;
import org.opensearch.sql.executor.QueryId;
import org.opensearch.sql.executor.QueryService;
import org.opensearch.sql.planner.physical.PhysicalPlan;

public class ContinuePaginatedPlan extends AbstractPlan {

public static final ContinuePaginatedPlan None
= new ContinuePaginatedPlan(QueryId.None, "", null,
null, null);
private final String cursor;
private final QueryService queryService;
private final PaginatedPlanCache paginatedPlanCache;

private final ResponseListener<ExecutionEngine.QueryResponse> queryResponseListener;


/**
* Create an abstract plan that can continue paginating a given cursor.
*/
public ContinuePaginatedPlan(QueryId queryId, String cursor, QueryService queryService,
PaginatedPlanCache ppc,
ResponseListener<ExecutionEngine.QueryResponse>
queryResponseListener) {
super(queryId);
this.cursor = cursor;
this.paginatedPlanCache = ppc;
this.queryService = queryService;
this.queryResponseListener = queryResponseListener;
}

@Override
public void execute() {
PhysicalPlan plan = paginatedPlanCache.convertToPlan(cursor);
queryService.executePlan(plan, queryResponseListener);
}

@Override
public void explain(ResponseListener<ExecutionEngine.ExplainResponse> listener) {
throw new NotImplementedException("Explain of query continuation is not supported");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.executor.execution;

import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.QueryId;
import org.opensearch.sql.executor.QueryService;

public class PaginatedPlan extends AbstractPlan {
final UnresolvedPlan plan;
final int fetchSize;
final QueryService queryService;
final ResponseListener<ExecutionEngine.QueryResponse>
queryResponseResponseListener;

/**
* Create an abstract plan that can start paging a query.
*/
public PaginatedPlan(QueryId queryId, UnresolvedPlan plan, int fetchSize,
QueryService queryService,
ResponseListener<ExecutionEngine.QueryResponse>
queryResponseResponseListener) {
super(queryId);
this.plan = plan;
this.fetchSize = fetchSize;
this.queryService = queryService;
this.queryResponseResponseListener = queryResponseResponseListener;
}

@Override
public void execute() {
queryService.execute(new Paginate(fetchSize, plan), queryResponseResponseListener);
}

@Override
public void explain(ResponseListener<ExecutionEngine.ExplainResponse> listener) {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe: throw or call explain from a regular plan?

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.sql.ast.statement.Statement;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.PaginatedPlanCache;
import org.opensearch.sql.executor.QueryId;
import org.opensearch.sql.executor.QueryService;

Expand All @@ -37,6 +38,7 @@ public class QueryPlanFactory
* Query Service.
*/
private final QueryService queryService;
private final PaginatedPlanCache paginatedPlanCache;

/**
* NO_CONSUMER_RESPONSE_LISTENER should never been called. It is only used as constructor
Expand Down Expand Up @@ -69,6 +71,16 @@ public AbstractPlan create(
return statement.accept(this, Pair.of(queryListener, explainListener));
}

/**
* Creates a ContinuePaginatedPlan from a cursor.
*/
public AbstractPlan create(String cursor, ResponseListener<ExecutionEngine.QueryResponse>
queryResponseListener) {
QueryId queryId = QueryId.queryId();
return new ContinuePaginatedPlan(queryId, cursor, queryService, paginatedPlanCache,
queryResponseListener);
}

@Override
public AbstractPlan visitQuery(
Query node,
Expand All @@ -79,7 +91,13 @@ public AbstractPlan visitQuery(
Preconditions.checkArgument(
context.getLeft().isPresent(), "[BUG] query listener must be not null");

return new QueryPlan(QueryId.queryId(), node.getPlan(), queryService, context.getLeft().get());
if (node.getFetchSize() > 0) {
return new PaginatedPlan(QueryId.queryId(), node.getPlan(), node.getFetchSize(), queryService,
context.getLeft().get());
} else {
return new QueryPlan(QueryId.queryId(), node.getPlan(), queryService,
context.getLeft().get());
}
}

@Override
Expand All @@ -97,4 +115,5 @@ public AbstractPlan visitExplain(
create(node.getStatement(), Optional.of(NO_CONSUMER_RESPONSE_LISTENER), Optional.empty()),
context.getRight().get());
}

}
Loading