Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Push down limit through eval #2927

Merged
merged 1 commit into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.List;
import java.util.stream.Collectors;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.optimizer.rule.EvalPushDown;
import org.opensearch.sql.planner.optimizer.rule.MergeFilterAndFilter;
import org.opensearch.sql.planner.optimizer.rule.PushFilterUnderSort;
import org.opensearch.sql.planner.optimizer.rule.read.CreateTableScanBuilder;
Expand Down Expand Up @@ -46,6 +47,7 @@ public static LogicalPlanOptimizer create() {
*/
new MergeFilterAndFilter(),
new PushFilterUnderSort(),
EvalPushDown.PUSH_DOWN_LIMIT,
/*
* Phase 2: Transformations that rely on data source push down capability
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Optional;
import lombok.experimental.UtilityClass;
import org.opensearch.sql.planner.logical.LogicalAggregation;
import org.opensearch.sql.planner.logical.LogicalEval;
import org.opensearch.sql.planner.logical.LogicalFilter;
import org.opensearch.sql.planner.logical.LogicalHighlight;
import org.opensearch.sql.planner.logical.LogicalLimit;
Expand Down Expand Up @@ -63,6 +64,10 @@ public static <T extends LogicalPlan> Pattern<LogicalProject> project(Pattern<T>
return Pattern.typeOf(LogicalProject.class).with(source(pattern));
}

public static Pattern<LogicalEval> evalCapture() {
return Pattern.typeOf(LogicalEval.class).capturedAs(Capture.newCapture());
}

/** Pattern for {@link TableScanBuilder} and capture it meanwhile. */
public static Pattern<TableScanBuilder> scanBuilder() {
return Pattern.typeOf(TableScanBuilder.class).capturedAs(Capture.newCapture());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.optimizer.rule;

import static org.opensearch.sql.planner.optimizer.pattern.Patterns.evalCapture;
import static org.opensearch.sql.planner.optimizer.pattern.Patterns.limit;
import static org.opensearch.sql.planner.optimizer.rule.EvalPushDown.EvalPushDownBuilder.match;

import com.facebook.presto.matching.Capture;
import com.facebook.presto.matching.Captures;
import com.facebook.presto.matching.Pattern;
import com.facebook.presto.matching.pattern.CapturePattern;
import com.facebook.presto.matching.pattern.WithPattern;
import java.util.List;
import java.util.function.BiFunction;
import lombok.Getter;
import lombok.experimental.Accessors;
import org.opensearch.sql.planner.logical.LogicalEval;
import org.opensearch.sql.planner.logical.LogicalLimit;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.optimizer.Rule;

/**
* Rule template for all rules related to push down logical plans under eval, so these plans can
* avoid blocking by eval and may have chances to be pushed down into table scan by rules in {@link
* org.opensearch.sql.planner.optimizer.rule.read.TableScanPushDown}.
*/
public class EvalPushDown<T extends LogicalPlan> implements Rule<T> {

// TODO: Add more rules to push down sort and project
/** Push down optimize rule for limit operator. Transform `limit -> eval` to `eval -> limit` */
public static final Rule<LogicalLimit> PUSH_DOWN_LIMIT =
match(limit(evalCapture()))
.apply(
(limit, logicalEval) -> {
List<LogicalPlan> child = logicalEval.getChild();
limit.replaceChildPlans(child);
logicalEval.replaceChildPlans(List.of(limit));
return logicalEval;
});

private final Capture<LogicalEval> capture;

@Accessors(fluent = true)
@Getter
private final Pattern<T> pattern;

private final BiFunction<T, LogicalEval, LogicalPlan> pushDownFunction;

@SuppressWarnings("unchecked")
public EvalPushDown(
WithPattern<T> pattern, BiFunction<T, LogicalEval, LogicalPlan> pushDownFunction) {
this.pattern = pattern;
this.capture = ((CapturePattern<LogicalEval>) pattern.getPattern()).capture();
this.pushDownFunction = pushDownFunction;
}

@Override
public LogicalPlan apply(T plan, Captures captures) {
LogicalEval logicalEval = captures.get(capture);
return pushDownFunction.apply(plan, logicalEval);
}

static class EvalPushDownBuilder<T extends LogicalPlan> {

private WithPattern<T> pattern;

public static <T extends LogicalPlan> EvalPushDown.EvalPushDownBuilder<T> match(
Pattern<T> pattern) {
EvalPushDown.EvalPushDownBuilder<T> builder = new EvalPushDown.EvalPushDownBuilder<>();
builder.pattern = (WithPattern<T>) pattern;
return builder;
}

public EvalPushDown<T> apply(BiFunction<T, LogicalEval, LogicalPlan> pushDownFunction) {
return new EvalPushDown<>(pattern, pushDownFunction);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static org.opensearch.sql.data.model.ExprValueUtils.longValue;
import static org.opensearch.sql.data.type.ExprCoreType.*;
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.aggregation;
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.eval;
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.filter;
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.highlight;
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.limit;
Expand Down Expand Up @@ -43,6 +44,7 @@
import org.opensearch.sql.ast.tree.Sort;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.expression.DSL;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.NamedExpression;
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.planner.logical.LogicalPaginate;
Expand Down Expand Up @@ -345,6 +347,27 @@ void table_scan_builder_support_offset_push_down_can_apply_its_rule() {
assertEquals(project(tableScanBuilder), optimized);
}

/** Limit - Eval --> Eval - Limit. */
@Test
void push_limit_under_eval() {
Pair<ReferenceExpression, Expression> evalExpr =
Pair.of(DSL.ref("name1", STRING), DSL.ref("name", STRING));
assertEquals(
eval(limit(tableScanBuilder, 10, 5), evalExpr),
optimize(limit(eval(relation("schema", table), evalExpr), 10, 5)));
}

/** Limit - Eval - Scan --> Eval - Scan. */
@Test
void push_limit_through_eval_into_scan() {
when(tableScanBuilder.pushDownLimit(any())).thenReturn(true);
Pair<ReferenceExpression, Expression> evalExpr =
Pair.of(DSL.ref("name1", STRING), DSL.ref("name", STRING));
assertEquals(
eval(tableScanBuilder, evalExpr),
optimize(limit(eval(relation("schema", table), evalExpr), 10, 5)));
}

private LogicalPlan optimize(LogicalPlan plan) {
final LogicalPlanOptimizer optimizer = LogicalPlanOptimizer.create();
return optimizer.optimize(plan);
Expand Down
13 changes: 13 additions & 0 deletions integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,19 @@ public void testSortPushDownExplain() throws Exception {
+ "| fields age"));
}

@Test
public void testLimitPushDownExplain() throws Exception {
String expected = loadFromFile("expectedOutput/ppl/explain_limit_push.json");

assertJsonEquals(
expected,
explainQueryToString(
"source=opensearch-sql_test_index_account"
+ "| eval ageMinus = age - 30 "
+ "| head 5 "
+ "| fields ageMinus"));
}

String loadFromFile(String filename) throws Exception {
URI uri = Resources.getResource(filename).toURI();
return new String(Files.readAllBytes(Paths.get(uri)));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"root": {
"name": "ProjectOperator",
"description": {
"fields": "[ageMinus]"
},
"children": [
{
"name": "EvalOperator",
"description": {
"expressions": {
"ageMinus": "-(age, 30)"
}
},
"children": [
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=opensearch-sql_test_index_account, sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\"}, searchDone=false)"
},
"children": []
}
]
}
]
}
}
Loading